This is an automated email from the ASF dual-hosted git repository.

thinkharderdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git


The following commit(s) were added to refs/heads/main by this push:
     new 22029020 Add executor terminating status for graceful shutdown (#667)
22029020 is described below

commit 2202902072e54dbb558b69212356ca1655bfccf8
Author: Dan Harris <[email protected]>
AuthorDate: Sat Feb 25 07:24:19 2023 -0500

    Add executor terminating status for graceful shutdown (#667)
    
    * Add executor terminating status for graceful shutdown
    
    * Remove empty file
    
    * Update ballista/executor/src/executor_process.rs
    
    Co-authored-by: Brent Gardner <[email protected]>
    
    ---------
    
    Co-authored-by: Brent Gardner <[email protected]>
---
 ballista/core/proto/ballista.proto               |   1 +
 ballista/core/src/serde/generated/ballista.rs    |   4 +-
 ballista/executor/src/executor.rs                |  21 ++++
 ballista/executor/src/executor_process.rs        |  60 +++++++++--
 ballista/executor/src/executor_server.rs         |  13 ++-
 ballista/scheduler/scheduler_config_spec.toml    |   6 ++
 ballista/scheduler/src/bin/main.rs               |   1 +
 ballista/scheduler/src/config.rs                 |   9 ++
 ballista/scheduler/src/scheduler_server/grpc.rs  |  34 ++++--
 ballista/scheduler/src/scheduler_server/mod.rs   | 130 ++++++++++++++---------
 ballista/scheduler/src/state/executor_manager.rs | 117 ++++++++++++++++++--
 11 files changed, 316 insertions(+), 80 deletions(-)

diff --git a/ballista/core/proto/ballista.proto 
b/ballista/core/proto/ballista.proto
index c169791d..722baaa9 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -323,6 +323,7 @@ message ExecutorStatus {
     string active = 1;
     string dead = 2;
     string unknown = 3;
+    string terminating = 4;
   }
 }
 
diff --git a/ballista/core/src/serde/generated/ballista.rs 
b/ballista/core/src/serde/generated/ballista.rs
index 28236ad0..f511f2a2 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -540,7 +540,7 @@ pub mod executor_metric {
 #[allow(clippy::derive_partial_eq_without_eq)]
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct ExecutorStatus {
-    #[prost(oneof = "executor_status::Status", tags = "1, 2, 3")]
+    #[prost(oneof = "executor_status::Status", tags = "1, 2, 3, 4")]
     pub status: ::core::option::Option<executor_status::Status>,
 }
 /// Nested message and enum types in `ExecutorStatus`.
@@ -554,6 +554,8 @@ pub mod executor_status {
         Dead(::prost::alloc::string::String),
         #[prost(string, tag = "3")]
         Unknown(::prost::alloc::string::String),
+        #[prost(string, tag = "4")]
+        Terminating(::prost::alloc::string::String),
     }
 }
 #[allow(clippy::derive_partial_eq_without_eq)]
diff --git a/ballista/executor/src/executor.rs 
b/ballista/executor/src/executor.rs
index d903db69..867b3ba8 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -19,7 +19,10 @@
 
 use dashmap::DashMap;
 use std::collections::HashMap;
+use std::future::Future;
+use std::pin::Pin;
 use std::sync::Arc;
+use std::task::{Context, Poll};
 
 use crate::metrics::ExecutorMetricsCollector;
 use ballista_core::error::BallistaError;
@@ -37,6 +40,20 @@ use futures::future::AbortHandle;
 
 use ballista_core::serde::scheduler::PartitionId;
 
+pub struct TasksDrainedFuture(pub Arc<Executor>);
+
+impl Future for TasksDrainedFuture {
+    type Output = ();
+
+    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> 
{
+        if self.0.abort_handles.len() > 0 {
+            Poll::Pending
+        } else {
+            Poll::Ready(())
+        }
+    }
+}
+
 type AbortHandles = Arc<DashMap<(usize, PartitionId), AbortHandle>>;
 
 /// Ballista executor
@@ -175,6 +192,10 @@ impl Executor {
     pub fn work_dir(&self) -> &str {
         &self.work_dir
     }
+
+    pub fn active_task_count(&self) -> usize {
+        self.abort_handles.len()
+    }
 }
 
 #[cfg(test)]
diff --git a/ballista/executor/src/executor_process.rs 
b/ballista/executor/src/executor_process.rs
index 8ec76038..6db3de06 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -18,6 +18,7 @@
 //! Ballista Executor Process
 
 use std::net::SocketAddr;
+use std::sync::atomic::Ordering;
 use std::sync::Arc;
 use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
 use std::{env, io};
@@ -41,18 +42,21 @@ use datafusion_proto::protobuf::{LogicalPlanNode, 
PhysicalPlanNode};
 
 use ballista_core::config::{LogRotationPolicy, TaskSchedulingPolicy};
 use ballista_core::error::BallistaError;
+use ballista_core::serde::protobuf::executor_resource::Resource;
+use ballista_core::serde::protobuf::executor_status::Status;
 use ballista_core::serde::protobuf::{
     executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
-    ExecutorRegistration, ExecutorStoppedParams,
+    ExecutorRegistration, ExecutorResource, ExecutorSpecification, 
ExecutorStatus,
+    ExecutorStoppedParams, HeartBeatParams,
 };
-use ballista_core::serde::scheduler::ExecutorSpecification;
 use ballista_core::serde::BallistaCodec;
 use ballista_core::utils::{
     create_grpc_client_connection, create_grpc_server, 
with_object_store_provider,
 };
 use ballista_core::BALLISTA_VERSION;
 
-use crate::executor::Executor;
+use crate::executor::{Executor, TasksDrainedFuture};
+use crate::executor_server::TERMINATING;
 use crate::flight_service::BallistaFlightService;
 use crate::metrics::LoggingMetricsCollector;
 use crate::shutdown::Shutdown;
@@ -155,12 +159,11 @@ pub async fn start_executor_process(opt: 
ExecutorProcessConfig) -> Result<()> {
             .map(executor_registration::OptionalHost::Host),
         port: opt.port as u32,
         grpc_port: opt.grpc_port as u32,
-        specification: Some(
-            ExecutorSpecification {
-                task_slots: concurrent_tasks as u32,
-            }
-            .into(),
-        ),
+        specification: Some(ExecutorSpecification {
+            resources: vec![ExecutorResource {
+                resource: Some(Resource::TaskSlots(concurrent_tasks as u32)),
+            }],
+        }),
     };
 
     let config = with_object_store_provider(
@@ -295,6 +298,8 @@ pub async fn start_executor_process(opt: 
ExecutorProcessConfig) -> Result<()> {
         shutdown_noti.subscribe_for_shutdown(),
     )));
 
+    let tasks_drained = TasksDrainedFuture(executor);
+
     // Concurrently run the service checking and listen for the `shutdown` 
signal and wait for the stop request coming.
     // The check_services runs until an error is encountered, so under normal 
circumstances, this `select!` statement runs
     // until the `shutdown` signal is received or a stop request is coming.
@@ -319,7 +324,41 @@ pub async fn start_executor_process(opt: 
ExecutorProcessConfig) -> Result<()> {
         },
     };
 
+    // Set status to fenced
+    info!("setting executor to TERMINATING status");
+    TERMINATING.store(true, Ordering::Release);
+
     if notify_scheduler {
+        // Send a heartbeat to update status of executor to `Fenced`. This 
should signal to the
+        // scheduler to no longer schedule tasks on this executor
+        if let Err(error) = scheduler
+            .heart_beat_from_executor(HeartBeatParams {
+                executor_id: executor_id.clone(),
+                metrics: vec![],
+                status: Some(ExecutorStatus {
+                    status: Some(Status::Terminating(String::default())),
+                }),
+                metadata: Some(ExecutorRegistration {
+                    id: executor_id.clone(),
+                    optional_host: opt
+                        .external_host
+                        .clone()
+                        .map(executor_registration::OptionalHost::Host),
+                    port: opt.port as u32,
+                    grpc_port: opt.grpc_port as u32,
+                    specification: Some(ExecutorSpecification {
+                        resources: vec![ExecutorResource {
+                            resource: 
Some(Resource::TaskSlots(concurrent_tasks as u32)),
+                        }],
+                    }),
+                }),
+            })
+            .await
+        {
+            error!("error sending heartbeat with fenced status: {:?}", error);
+        }
+
+        // TODO we probably don't need a separate rpc call for this....
         if let Err(error) = scheduler
             .executor_stopped(ExecutorStoppedParams {
                 executor_id,
@@ -329,6 +368,9 @@ pub async fn start_executor_process(opt: 
ExecutorProcessConfig) -> Result<()> {
         {
             error!("ExecutorStopped grpc failed: {:?}", error);
         }
+
+        // Wait for tasks to drain
+        tasks_drained.await;
     }
 
     // Extract the `shutdown_complete` receiver and transmitter
diff --git a/ballista/executor/src/executor_server.rs 
b/ballista/executor/src/executor_server.rs
index 2372b6ca..89f2eef5 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -20,6 +20,7 @@ use std::collections::HashMap;
 use std::convert::TryInto;
 use std::ops::Deref;
 use std::path::{Path, PathBuf};
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
 use tokio::sync::mpsc;
@@ -195,6 +196,10 @@ struct ExecutorEnv {
 
 unsafe impl Sync for ExecutorEnv {}
 
+/// Global flag indicating whether the executor is terminating. This should be
+/// set to `true` when the executor receives a shutdown signal
+pub static TERMINATING: AtomicBool = AtomicBool::new(false);
+
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> 
ExecutorServer<T, U> {
     fn new(
         scheduler_to_register: SchedulerGrpcClient<Channel>,
@@ -240,11 +245,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> ExecutorServer<T,
     /// 1. First Heartbeat to its registration scheduler, if successful then 
return; else go next.
     /// 2. Heartbeat to schedulers which has launching tasks to this executor 
until one succeeds
     async fn heartbeat(&self) {
+        let status = if TERMINATING.load(Ordering::Acquire) {
+            executor_status::Status::Terminating(String::default())
+        } else {
+            executor_status::Status::Active(String::default())
+        };
+
         let heartbeat_params = HeartBeatParams {
             executor_id: self.executor.metadata.id.clone(),
             metrics: self.get_executor_metrics(),
             status: Some(ExecutorStatus {
-                status: Some(executor_status::Status::Active("".to_string())),
+                status: Some(status),
             }),
             metadata: Some(self.executor.metadata.clone()),
         };
diff --git a/ballista/scheduler/scheduler_config_spec.toml 
b/ballista/scheduler/scheduler_config_spec.toml
index f6575b43..93ffe4c1 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -141,3 +141,9 @@ name = "job_resubmit_interval_ms"
 type = "u64"
 default = "0"
 doc = "If job is not able to be scheduled on submission, wait for this 
interval and resubmit. Default value of 0 indicates that job shuuld not be 
resubmitted"
+
+[[param]]
+name = "executor_termination_grace_period"
+type = "u64"
+default = "30"
+doc = "Time in seconds an executor should be considered lost after it enters 
terminating status"
diff --git a/ballista/scheduler/src/bin/main.rs 
b/ballista/scheduler/src/bin/main.rs
index 8be0078a..93904901 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -118,6 +118,7 @@ async fn main() -> Result<()> {
         cluster_storage: ClusterStorageConfig::Memory,
         job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0)
             .then_some(opt.job_resubmit_interval_ms),
+        executor_termination_grace_period: 
opt.executor_termination_grace_period,
     };
 
     let cluster = BallistaCluster::new_from_config(&config).await?;
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index ddd05b78..087e3867 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -49,6 +49,9 @@ pub struct SchedulerConfig {
     pub job_resubmit_interval_ms: Option<u64>,
     /// Configuration for ballista cluster storage
     pub cluster_storage: ClusterStorageConfig,
+    /// Time in seconds to allow executor for graceful shutdown. Once an 
executor signals it has entered Terminating status
+    /// the scheduler should only consider the executor dead after this time 
interval has elapsed
+    pub executor_termination_grace_period: u64,
 }
 
 impl Default for SchedulerConfig {
@@ -65,6 +68,7 @@ impl Default for SchedulerConfig {
             advertise_flight_sql_endpoint: None,
             cluster_storage: ClusterStorageConfig::Memory,
             job_resubmit_interval_ms: None,
+            executor_termination_grace_period: 0,
         }
     }
 }
@@ -141,6 +145,11 @@ impl SchedulerConfig {
         self.job_resubmit_interval_ms = Some(interval_ms);
         self
     }
+
+    pub fn with_remove_executor_wait_secs(mut self, value: u64) -> Self {
+        self.executor_termination_grace_period = value;
+        self
+    }
 }
 
 #[derive(Clone, Debug)]
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs 
b/ballista/scheduler/src/scheduler_server/grpc.rs
index bff078d0..de07a06d 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -258,6 +258,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
             metrics,
             status,
         };
+
         self.state
             .executor_manager
             .save_executor_heartbeat(executor_heartbeat)
@@ -521,13 +522,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerGrpc
             error!("{}", msg);
             Status::internal(msg)
         })?;
-        Self::remove_executor(executor_manager, event_sender, &executor_id, 
Some(reason))
-            .await
-            .map_err(|e| {
-                let msg = format!("Error to remove executor in Scheduler due 
to {e:?}");
-                error!("{}", msg);
-                Status::internal(msg)
-            })?;
+
+        Self::remove_executor(
+            executor_manager,
+            event_sender,
+            &executor_id,
+            Some(reason),
+            self.executor_termination_grace_period,
+        );
 
         Ok(Response::new(ExecutorStoppedResult {}))
     }
@@ -603,6 +605,7 @@ mod test {
 
     use crate::state::executor_manager::DEFAULT_EXECUTOR_TIMEOUT_SECONDS;
     use crate::state::SchedulerState;
+    use crate::test_utils::await_condition;
     use crate::test_utils::test_cluster_context;
 
     use super::{SchedulerGrpc, SchedulerServer};
@@ -702,7 +705,7 @@ mod test {
                 "localhost:50050".to_owned(),
                 cluster.clone(),
                 BallistaCodec::default(),
-                SchedulerConfig::default(),
+                SchedulerConfig::default().with_remove_executor_wait_secs(0),
                 default_metrics_collector().unwrap(),
             );
         scheduler.init().await?;
@@ -760,15 +763,22 @@ mod test {
             .await
             .expect("getting executor");
 
+        let is_stopped = await_condition(Duration::from_millis(10), 5, || {
+            
futures::future::ready(Ok(state.executor_manager.is_dead_executor("abc")))
+        })
+        .await?;
+
         // executor should be marked to dead
-        assert!(state.executor_manager.is_dead_executor("abc"));
+        assert!(is_stopped, "Executor not marked dead after 50ms");
 
         let active_executors = state
             .executor_manager
             .get_alive_executors_within_one_minute();
         assert!(active_executors.is_empty());
 
-        let expired_executors = state.executor_manager.get_expired_executors();
+        let expired_executors = state
+            .executor_manager
+            
.get_expired_executors(scheduler.executor_termination_grace_period);
         assert!(expired_executors.is_empty());
 
         Ok(())
@@ -895,7 +905,9 @@ mod test {
             .get_alive_executors_within_one_minute();
         assert_eq!(active_executors.len(), 1);
 
-        let expired_executors = state.executor_manager.get_expired_executors();
+        let expired_executors = state
+            .executor_manager
+            
.get_expired_executors(scheduler.executor_termination_grace_period);
         assert!(expired_executors.is_empty());
 
         // simulate the heartbeat timeout
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs 
b/ballista/scheduler/src/scheduler_server/mod.rs
index de37365b..fc40b017 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -40,6 +40,7 @@ use 
crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
 
 use crate::state::executor_manager::{
     ExecutorManager, ExecutorReservation, DEFAULT_EXECUTOR_TIMEOUT_SECONDS,
+    EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS,
 };
 
 use crate::state::task_manager::TaskLauncher;
@@ -65,6 +66,7 @@ pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 
'static + AsExecutionP
     pub(crate) state: Arc<SchedulerState<T, U>>,
     pub(crate) query_stage_event_loop: EventLoop<QueryStageSchedulerEvent>,
     query_stage_scheduler: Arc<QueryStageScheduler<T, U>>,
+    executor_termination_grace_period: u64,
 }
 
 impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> 
SchedulerServer<T, U> {
@@ -98,6 +100,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
             state,
             query_stage_event_loop,
             query_stage_scheduler,
+            executor_termination_grace_period: 
config.executor_termination_grace_period,
         }
     }
 
@@ -134,6 +137,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
             state,
             query_stage_event_loop,
             query_stage_scheduler,
+            executor_termination_grace_period: 
config.executor_termination_grace_period,
         }
     }
 
@@ -216,83 +220,113 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
     fn expire_dead_executors(&self) -> Result<()> {
         let state = self.state.clone();
         let event_sender = self.query_stage_event_loop.get_sender()?;
+        let termination_grace_period = self.executor_termination_grace_period;
         tokio::task::spawn(async move {
             loop {
-                let expired_executors = 
state.executor_manager.get_expired_executors();
+                let expired_executors = state
+                    .executor_manager
+                    .get_expired_executors(termination_grace_period);
                 for expired in expired_executors {
                     let executor_id = expired.executor_id.clone();
                     let executor_manager = state.executor_manager.clone();
-                    let stop_reason = format!(
-                        "Executor {} heartbeat timed out after {}s",
-                        executor_id.clone(),
-                        DEFAULT_EXECUTOR_TIMEOUT_SECONDS
-                    );
-                    warn!("{}", stop_reason.clone());
+
                     let sender_clone = event_sender.clone();
+
+                    let terminating = matches!(
+                        expired
+                            .status
+                            .as_ref()
+                            .and_then(|status| status.status.as_ref()),
+                        
Some(ballista_core::serde::protobuf::executor_status::Status::Terminating(_))
+                    );
+
+                    let stop_reason = if terminating {
+                        format!(
+                        "TERMINATING executor {executor_id} heartbeat timed 
out after {termination_grace_period}s"
+                    )
+                    } else {
+                        format!(
+                            "ACTIVE executor {executor_id} heartbeat timed out 
after {DEFAULT_EXECUTOR_TIMEOUT_SECONDS}s",
+                        )
+                    };
+
+                    warn!("{stop_reason}");
+
+                    // If executor is expired, remove it immediately
                     Self::remove_executor(
                         executor_manager,
                         sender_clone,
                         &executor_id,
                         Some(stop_reason.clone()),
-                    )
-                    .await
-                    .unwrap_or_else(|e| {
-                        let msg =
-                            format!("Error to remove Executor in Scheduler due 
to {e:?}");
-                        error!("{}", msg);
-                    });
+                        0,
+                    );
 
-                    match 
state.executor_manager.get_client(&executor_id).await {
-                        Ok(mut client) => {
-                            tokio::task::spawn(async move {
-                                match client
-                                    .stop_executor(StopExecutorParams {
-                                        executor_id,
-                                        reason: stop_reason,
-                                        force: true,
-                                    })
-                                    .await
-                                {
-                                    Err(error) => {
-                                        warn!(
+                    // If executor is not already terminating then stop it. If 
it is terminating then it should already be shutting
+                    // down and we do not need to do anything here.
+                    if !terminating {
+                        match 
state.executor_manager.get_client(&executor_id).await {
+                            Ok(mut client) => {
+                                tokio::task::spawn(async move {
+                                    match client
+                                        .stop_executor(StopExecutorParams {
+                                            executor_id,
+                                            reason: stop_reason,
+                                            force: true,
+                                        })
+                                        .await
+                                    {
+                                        Err(error) => {
+                                            warn!(
                                             "Failed to send stop_executor rpc 
due to, {}",
                                             error
                                         );
+                                        }
+                                        Ok(_value) => {}
                                     }
-                                    Ok(_value) => {}
-                                }
-                            });
-                        }
-                        Err(_) => {
-                            warn!("Executor is already dead, failed to connect 
to Executor {}", executor_id);
+                                });
+                            }
+                            Err(_) => {
+                                warn!("Executor is already dead, failed to 
connect to Executor {}", executor_id);
+                            }
                         }
                     }
                 }
-                
tokio::time::sleep(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS))
-                    .await;
+                tokio::time::sleep(Duration::from_secs(
+                    EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS,
+                ))
+                .await;
             }
         });
         Ok(())
     }
 
-    pub(crate) async fn remove_executor(
+    pub(crate) fn remove_executor(
         executor_manager: ExecutorManager,
         event_sender: EventSender<QueryStageSchedulerEvent>,
         executor_id: &str,
         reason: Option<String>,
-    ) -> Result<()> {
-        // Update the executor manager immediately here
-        executor_manager
-            .remove_executor(executor_id, reason.clone())
-            .await?;
+        wait_secs: u64,
+    ) {
+        let executor_id = executor_id.to_owned();
+        tokio::spawn(async move {
+            // Wait for `wait_secs` before removing executor
+            tokio::time::sleep(Duration::from_secs(wait_secs)).await;
+
+            // Update the executor manager immediately here
+            if let Err(e) = executor_manager
+                .remove_executor(&executor_id, reason.clone())
+                .await
+            {
+                error!("error removing executor {executor_id}: {e:?}");
+            }
 
-        event_sender
-            .post_event(QueryStageSchedulerEvent::ExecutorLost(
-                executor_id.to_owned(),
-                reason,
-            ))
-            .await?;
-        Ok(())
+            if let Err(e) = event_sender
+                
.post_event(QueryStageSchedulerEvent::ExecutorLost(executor_id, reason))
+                .await
+            {
+                error!("error sending ExecutorLost event: {e:?}");
+            }
+        });
     }
 
     async fn do_register_executor(&self, metadata: ExecutorMetadata) -> 
Result<()> {
diff --git a/ballista/scheduler/src/state/executor_manager.rs 
b/ballista/scheduler/src/state/executor_manager.rs
index 9a51aa77..bec97c3b 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -85,6 +85,10 @@ impl ExecutorReservation {
 /// to be dead.
 pub const DEFAULT_EXECUTOR_TIMEOUT_SECONDS: u64 = 180;
 
+// TODO move to configuration file
+/// Interval check for expired or dead executors
+pub const EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS: u64 = 15;
+
 #[derive(Clone)]
 pub(crate) struct ExecutorManager {
     // executor slot policy
@@ -140,14 +144,19 @@ impl ExecutorManager {
         tokio::task::spawn(async move {
             while let Some(heartbeat) = heartbeat_stream.next().await {
                 let executor_id = heartbeat.executor_id.clone();
-                if let Some(ExecutorStatus {
-                    status: Some(executor_status::Status::Dead(_)),
-                }) = heartbeat.status
+
+                match heartbeat
+                    .status
+                    .as_ref()
+                    .and_then(|status| status.status.as_ref())
                 {
-                    heartbeats.remove(&executor_id);
-                    dead_executors.insert(executor_id);
-                } else {
-                    heartbeats.insert(executor_id, heartbeat);
+                    Some(executor_status::Status::Dead(_)) => {
+                        heartbeats.remove(&executor_id);
+                        dead_executors.insert(executor_id);
+                    }
+                    _ => {
+                        heartbeats.insert(executor_id, heartbeat);
+                    }
                 }
             }
         });
@@ -614,27 +623,62 @@ impl ExecutorManager {
             .iter()
             .filter_map(|pair| {
                 let (exec, heartbeat) = pair.pair();
-                (heartbeat.timestamp > last_seen_ts_threshold).then(|| 
exec.clone())
+
+                let active = matches!(
+                    heartbeat
+                        .status
+                        .as_ref()
+                        .and_then(|status| status.status.as_ref()),
+                    Some(executor_status::Status::Active(_))
+                );
+                let live = heartbeat.timestamp > last_seen_ts_threshold;
+
+                (active && live).then(|| exec.clone())
             })
             .collect()
     }
 
     /// Return a list of expired executors
-    pub(crate) fn get_expired_executors(&self) -> Vec<ExecutorHeartbeat> {
+    pub(crate) fn get_expired_executors(
+        &self,
+        termination_grace_period: u64,
+    ) -> Vec<ExecutorHeartbeat> {
         let now_epoch_ts = SystemTime::now()
             .duration_since(UNIX_EPOCH)
             .expect("Time went backwards");
+        // Threshold for last heartbeat from Active executor before marking 
dead
         let last_seen_threshold = now_epoch_ts
             .checked_sub(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS))
             .unwrap_or_else(|| Duration::from_secs(0))
             .as_secs();
 
+        // Threshold for last heartbeat for Fenced executor before marking dead
+        let termination_wait_threshold = now_epoch_ts
+            .checked_sub(Duration::from_secs(termination_grace_period))
+            .unwrap_or_else(|| Duration::from_secs(0))
+            .as_secs();
+
         let expired_executors = self
             .executors_heartbeat
             .iter()
             .filter_map(|pair| {
                 let (_exec, heartbeat) = pair.pair();
-                (heartbeat.timestamp <= last_seen_threshold).then(|| 
heartbeat.clone())
+
+                let terminating = matches!(
+                    heartbeat
+                        .status
+                        .as_ref()
+                        .and_then(|status| status.status.as_ref()),
+                    Some(executor_status::Status::Terminating(_))
+                );
+
+                let grace_period_expired =
+                    heartbeat.timestamp <= termination_wait_threshold;
+
+                let expired = heartbeat.timestamp <= last_seen_threshold;
+
+                ((terminating && grace_period_expired) || expired)
+                    .then(|| heartbeat.clone())
             })
             .collect::<Vec<_>>();
         expired_executors
@@ -656,9 +700,12 @@ mod test {
 
     use crate::config::SlotsPolicy;
 
+    use crate::scheduler_server::timestamp_secs;
     use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
     use crate::test_utils::test_cluster_context;
     use ballista_core::error::Result;
+    use ballista_core::serde::protobuf::executor_status::Status;
+    use ballista_core::serde::protobuf::{ExecutorHeartbeat, ExecutorStatus};
     use ballista_core::serde::scheduler::{
         ExecutorData, ExecutorMetadata, ExecutorSpecification,
     };
@@ -844,6 +891,56 @@ mod test {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_ignore_fenced_executors() -> Result<()> {
+        test_ignore_fenced_executors_inner(SlotsPolicy::Bias).await?;
+        test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobin).await?;
+        
test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobinLocal).await?;
+
+        Ok(())
+    }
+
+    async fn test_ignore_fenced_executors_inner(slots_policy: SlotsPolicy) -> 
Result<()> {
+        let cluster = test_cluster_context();
+
+        let executor_manager =
+            ExecutorManager::new(cluster.cluster_state(), slots_policy);
+
+        // Setup two executors initially
+        let executors = test_executors(2, 4);
+
+        for (executor_metadata, executor_data) in executors {
+            let _ = executor_manager
+                .register_executor(executor_metadata, executor_data, false)
+                .await?;
+        }
+
+        // Fence one of the executors
+        executor_manager
+            .save_executor_heartbeat(ExecutorHeartbeat {
+                executor_id: "executor-0".to_string(),
+                timestamp: timestamp_secs(),
+                metrics: vec![],
+                status: Some(ExecutorStatus {
+                    status: Some(Status::Terminating(String::default())),
+                }),
+            })
+            .await?;
+
+        let reservations = executor_manager.reserve_slots(8).await?;
+
+        assert_eq!(reservations.len(), 4, "Expected only four reservations");
+
+        assert!(
+            reservations
+                .iter()
+                .all(|res| res.executor_id == "executor-1"),
+            "Expected all reservations from non-fenced executor",
+        );
+
+        Ok(())
+    }
+
     fn test_executors(
         total_executors: usize,
         slots_per_executor: u32,

Reply via email to