This is an automated email from the ASF dual-hosted git repository.
thinkharderdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-ballista.git
The following commit(s) were added to refs/heads/main by this push:
new 22029020 Add executor terminating status for graceful shutdown (#667)
22029020 is described below
commit 2202902072e54dbb558b69212356ca1655bfccf8
Author: Dan Harris <[email protected]>
AuthorDate: Sat Feb 25 07:24:19 2023 -0500
Add executor terminating status for graceful shutdown (#667)
* Add executor terminating status for graceful shutdown
* Remove empty file
* Update ballista/executor/src/executor_process.rs
Co-authored-by: Brent Gardner <[email protected]>
---------
Co-authored-by: Brent Gardner <[email protected]>
---
ballista/core/proto/ballista.proto | 1 +
ballista/core/src/serde/generated/ballista.rs | 4 +-
ballista/executor/src/executor.rs | 21 ++++
ballista/executor/src/executor_process.rs | 60 +++++++++--
ballista/executor/src/executor_server.rs | 13 ++-
ballista/scheduler/scheduler_config_spec.toml | 6 ++
ballista/scheduler/src/bin/main.rs | 1 +
ballista/scheduler/src/config.rs | 9 ++
ballista/scheduler/src/scheduler_server/grpc.rs | 34 ++++--
ballista/scheduler/src/scheduler_server/mod.rs | 130 ++++++++++++++---------
ballista/scheduler/src/state/executor_manager.rs | 117 ++++++++++++++++++--
11 files changed, 316 insertions(+), 80 deletions(-)
diff --git a/ballista/core/proto/ballista.proto
b/ballista/core/proto/ballista.proto
index c169791d..722baaa9 100644
--- a/ballista/core/proto/ballista.proto
+++ b/ballista/core/proto/ballista.proto
@@ -323,6 +323,7 @@ message ExecutorStatus {
string active = 1;
string dead = 2;
string unknown = 3;
+ string terminating = 4;
}
}
diff --git a/ballista/core/src/serde/generated/ballista.rs
b/ballista/core/src/serde/generated/ballista.rs
index 28236ad0..f511f2a2 100644
--- a/ballista/core/src/serde/generated/ballista.rs
+++ b/ballista/core/src/serde/generated/ballista.rs
@@ -540,7 +540,7 @@ pub mod executor_metric {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ExecutorStatus {
- #[prost(oneof = "executor_status::Status", tags = "1, 2, 3")]
+ #[prost(oneof = "executor_status::Status", tags = "1, 2, 3, 4")]
pub status: ::core::option::Option<executor_status::Status>,
}
/// Nested message and enum types in `ExecutorStatus`.
@@ -554,6 +554,8 @@ pub mod executor_status {
Dead(::prost::alloc::string::String),
#[prost(string, tag = "3")]
Unknown(::prost::alloc::string::String),
+ #[prost(string, tag = "4")]
+ Terminating(::prost::alloc::string::String),
}
}
#[allow(clippy::derive_partial_eq_without_eq)]
diff --git a/ballista/executor/src/executor.rs
b/ballista/executor/src/executor.rs
index d903db69..867b3ba8 100644
--- a/ballista/executor/src/executor.rs
+++ b/ballista/executor/src/executor.rs
@@ -19,7 +19,10 @@
use dashmap::DashMap;
use std::collections::HashMap;
+use std::future::Future;
+use std::pin::Pin;
use std::sync::Arc;
+use std::task::{Context, Poll};
use crate::metrics::ExecutorMetricsCollector;
use ballista_core::error::BallistaError;
@@ -37,6 +40,20 @@ use futures::future::AbortHandle;
use ballista_core::serde::scheduler::PartitionId;
+pub struct TasksDrainedFuture(pub Arc<Executor>);
+
+impl Future for TasksDrainedFuture {
+ type Output = ();
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output>
{
+ if self.0.abort_handles.len() > 0 {
+ Poll::Pending
+ } else {
+ Poll::Ready(())
+ }
+ }
+}
+
type AbortHandles = Arc<DashMap<(usize, PartitionId), AbortHandle>>;
/// Ballista executor
@@ -175,6 +192,10 @@ impl Executor {
pub fn work_dir(&self) -> &str {
&self.work_dir
}
+
+ pub fn active_task_count(&self) -> usize {
+ self.abort_handles.len()
+ }
}
#[cfg(test)]
diff --git a/ballista/executor/src/executor_process.rs
b/ballista/executor/src/executor_process.rs
index 8ec76038..6db3de06 100644
--- a/ballista/executor/src/executor_process.rs
+++ b/ballista/executor/src/executor_process.rs
@@ -18,6 +18,7 @@
//! Ballista Executor Process
use std::net::SocketAddr;
+use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::{env, io};
@@ -41,18 +42,21 @@ use datafusion_proto::protobuf::{LogicalPlanNode,
PhysicalPlanNode};
use ballista_core::config::{LogRotationPolicy, TaskSchedulingPolicy};
use ballista_core::error::BallistaError;
+use ballista_core::serde::protobuf::executor_resource::Resource;
+use ballista_core::serde::protobuf::executor_status::Status;
use ballista_core::serde::protobuf::{
executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
- ExecutorRegistration, ExecutorStoppedParams,
+ ExecutorRegistration, ExecutorResource, ExecutorSpecification,
ExecutorStatus,
+ ExecutorStoppedParams, HeartBeatParams,
};
-use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::{
create_grpc_client_connection, create_grpc_server,
with_object_store_provider,
};
use ballista_core::BALLISTA_VERSION;
-use crate::executor::Executor;
+use crate::executor::{Executor, TasksDrainedFuture};
+use crate::executor_server::TERMINATING;
use crate::flight_service::BallistaFlightService;
use crate::metrics::LoggingMetricsCollector;
use crate::shutdown::Shutdown;
@@ -155,12 +159,11 @@ pub async fn start_executor_process(opt:
ExecutorProcessConfig) -> Result<()> {
.map(executor_registration::OptionalHost::Host),
port: opt.port as u32,
grpc_port: opt.grpc_port as u32,
- specification: Some(
- ExecutorSpecification {
- task_slots: concurrent_tasks as u32,
- }
- .into(),
- ),
+ specification: Some(ExecutorSpecification {
+ resources: vec![ExecutorResource {
+ resource: Some(Resource::TaskSlots(concurrent_tasks as u32)),
+ }],
+ }),
};
let config = with_object_store_provider(
@@ -295,6 +298,8 @@ pub async fn start_executor_process(opt:
ExecutorProcessConfig) -> Result<()> {
shutdown_noti.subscribe_for_shutdown(),
)));
+ let tasks_drained = TasksDrainedFuture(executor);
+
// Concurrently run the service checking and listen for the `shutdown`
signal and wait for the stop request coming.
// The check_services runs until an error is encountered, so under normal
circumstances, this `select!` statement runs
// until the `shutdown` signal is received or a stop request is coming.
@@ -319,7 +324,41 @@ pub async fn start_executor_process(opt:
ExecutorProcessConfig) -> Result<()> {
},
};
+ // Set status to fenced
+ info!("setting executor to TERMINATING status");
+ TERMINATING.store(true, Ordering::Release);
+
if notify_scheduler {
+ // Send a heartbeat to update status of executor to `Fenced`. This
should signal to the
+ // scheduler to no longer schedule tasks on this executor
+ if let Err(error) = scheduler
+ .heart_beat_from_executor(HeartBeatParams {
+ executor_id: executor_id.clone(),
+ metrics: vec![],
+ status: Some(ExecutorStatus {
+ status: Some(Status::Terminating(String::default())),
+ }),
+ metadata: Some(ExecutorRegistration {
+ id: executor_id.clone(),
+ optional_host: opt
+ .external_host
+ .clone()
+ .map(executor_registration::OptionalHost::Host),
+ port: opt.port as u32,
+ grpc_port: opt.grpc_port as u32,
+ specification: Some(ExecutorSpecification {
+ resources: vec![ExecutorResource {
+ resource:
Some(Resource::TaskSlots(concurrent_tasks as u32)),
+ }],
+ }),
+ }),
+ })
+ .await
+ {
+ error!("error sending heartbeat with fenced status: {:?}", error);
+ }
+
+ // TODO we probably don't need a separate rpc call for this....
if let Err(error) = scheduler
.executor_stopped(ExecutorStoppedParams {
executor_id,
@@ -329,6 +368,9 @@ pub async fn start_executor_process(opt:
ExecutorProcessConfig) -> Result<()> {
{
error!("ExecutorStopped grpc failed: {:?}", error);
}
+
+ // Wait for tasks to drain
+ tasks_drained.await;
}
// Extract the `shutdown_complete` receiver and transmitter
diff --git a/ballista/executor/src/executor_server.rs
b/ballista/executor/src/executor_server.rs
index 2372b6ca..89f2eef5 100644
--- a/ballista/executor/src/executor_server.rs
+++ b/ballista/executor/src/executor_server.rs
@@ -20,6 +20,7 @@ use std::collections::HashMap;
use std::convert::TryInto;
use std::ops::Deref;
use std::path::{Path, PathBuf};
+use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
@@ -195,6 +196,10 @@ struct ExecutorEnv {
unsafe impl Sync for ExecutorEnv {}
+/// Global flag indicating whether the executor is terminating. This should be
+/// set to `true` when the executor receives a shutdown signal
+pub static TERMINATING: AtomicBool = AtomicBool::new(false);
+
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
ExecutorServer<T, U> {
fn new(
scheduler_to_register: SchedulerGrpcClient<Channel>,
@@ -240,11 +245,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
/// 1. First Heartbeat to its registration scheduler, if successful then
return; else go next.
/// 2. Heartbeat to schedulers which has launching tasks to this executor
until one succeeds
async fn heartbeat(&self) {
+ let status = if TERMINATING.load(Ordering::Acquire) {
+ executor_status::Status::Terminating(String::default())
+ } else {
+ executor_status::Status::Active(String::default())
+ };
+
let heartbeat_params = HeartBeatParams {
executor_id: self.executor.metadata.id.clone(),
metrics: self.get_executor_metrics(),
status: Some(ExecutorStatus {
- status: Some(executor_status::Status::Active("".to_string())),
+ status: Some(status),
}),
metadata: Some(self.executor.metadata.clone()),
};
diff --git a/ballista/scheduler/scheduler_config_spec.toml
b/ballista/scheduler/scheduler_config_spec.toml
index f6575b43..93ffe4c1 100644
--- a/ballista/scheduler/scheduler_config_spec.toml
+++ b/ballista/scheduler/scheduler_config_spec.toml
@@ -141,3 +141,9 @@ name = "job_resubmit_interval_ms"
type = "u64"
default = "0"
doc = "If job is not able to be scheduled on submission, wait for this
interval and resubmit. Default value of 0 indicates that job shuuld not be
resubmitted"
+
+[[param]]
+name = "executor_termination_grace_period"
+type = "u64"
+default = "30"
+doc = "Time in seconds an executor should be considered lost after it enters
terminating status"
diff --git a/ballista/scheduler/src/bin/main.rs
b/ballista/scheduler/src/bin/main.rs
index 8be0078a..93904901 100644
--- a/ballista/scheduler/src/bin/main.rs
+++ b/ballista/scheduler/src/bin/main.rs
@@ -118,6 +118,7 @@ async fn main() -> Result<()> {
cluster_storage: ClusterStorageConfig::Memory,
job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0)
.then_some(opt.job_resubmit_interval_ms),
+ executor_termination_grace_period:
opt.executor_termination_grace_period,
};
let cluster = BallistaCluster::new_from_config(&config).await?;
diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs
index ddd05b78..087e3867 100644
--- a/ballista/scheduler/src/config.rs
+++ b/ballista/scheduler/src/config.rs
@@ -49,6 +49,9 @@ pub struct SchedulerConfig {
pub job_resubmit_interval_ms: Option<u64>,
/// Configuration for ballista cluster storage
pub cluster_storage: ClusterStorageConfig,
+ /// Time in seconds to allow executor for graceful shutdown. Once an
executor signals it has entered Terminating status
+ /// the scheduler should only consider the executor dead after this time
interval has elapsed
+ pub executor_termination_grace_period: u64,
}
impl Default for SchedulerConfig {
@@ -65,6 +68,7 @@ impl Default for SchedulerConfig {
advertise_flight_sql_endpoint: None,
cluster_storage: ClusterStorageConfig::Memory,
job_resubmit_interval_ms: None,
+ executor_termination_grace_period: 0,
}
}
}
@@ -141,6 +145,11 @@ impl SchedulerConfig {
self.job_resubmit_interval_ms = Some(interval_ms);
self
}
+
+ pub fn with_remove_executor_wait_secs(mut self, value: u64) -> Self {
+ self.executor_termination_grace_period = value;
+ self
+ }
}
#[derive(Clone, Debug)]
diff --git a/ballista/scheduler/src/scheduler_server/grpc.rs
b/ballista/scheduler/src/scheduler_server/grpc.rs
index bff078d0..de07a06d 100644
--- a/ballista/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/scheduler/src/scheduler_server/grpc.rs
@@ -258,6 +258,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
metrics,
status,
};
+
self.state
.executor_manager
.save_executor_heartbeat(executor_heartbeat)
@@ -521,13 +522,14 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
error!("{}", msg);
Status::internal(msg)
})?;
- Self::remove_executor(executor_manager, event_sender, &executor_id,
Some(reason))
- .await
- .map_err(|e| {
- let msg = format!("Error to remove executor in Scheduler due
to {e:?}");
- error!("{}", msg);
- Status::internal(msg)
- })?;
+
+ Self::remove_executor(
+ executor_manager,
+ event_sender,
+ &executor_id,
+ Some(reason),
+ self.executor_termination_grace_period,
+ );
Ok(Response::new(ExecutorStoppedResult {}))
}
@@ -603,6 +605,7 @@ mod test {
use crate::state::executor_manager::DEFAULT_EXECUTOR_TIMEOUT_SECONDS;
use crate::state::SchedulerState;
+ use crate::test_utils::await_condition;
use crate::test_utils::test_cluster_context;
use super::{SchedulerGrpc, SchedulerServer};
@@ -702,7 +705,7 @@ mod test {
"localhost:50050".to_owned(),
cluster.clone(),
BallistaCodec::default(),
- SchedulerConfig::default(),
+ SchedulerConfig::default().with_remove_executor_wait_secs(0),
default_metrics_collector().unwrap(),
);
scheduler.init().await?;
@@ -760,15 +763,22 @@ mod test {
.await
.expect("getting executor");
+ let is_stopped = await_condition(Duration::from_millis(10), 5, || {
+
futures::future::ready(Ok(state.executor_manager.is_dead_executor("abc")))
+ })
+ .await?;
+
// executor should be marked to dead
- assert!(state.executor_manager.is_dead_executor("abc"));
+ assert!(is_stopped, "Executor not marked dead after 50ms");
let active_executors = state
.executor_manager
.get_alive_executors_within_one_minute();
assert!(active_executors.is_empty());
- let expired_executors = state.executor_manager.get_expired_executors();
+ let expired_executors = state
+ .executor_manager
+
.get_expired_executors(scheduler.executor_termination_grace_period);
assert!(expired_executors.is_empty());
Ok(())
@@ -895,7 +905,9 @@ mod test {
.get_alive_executors_within_one_minute();
assert_eq!(active_executors.len(), 1);
- let expired_executors = state.executor_manager.get_expired_executors();
+ let expired_executors = state
+ .executor_manager
+
.get_expired_executors(scheduler.executor_termination_grace_period);
assert!(expired_executors.is_empty());
// simulate the heartbeat timeout
diff --git a/ballista/scheduler/src/scheduler_server/mod.rs
b/ballista/scheduler/src/scheduler_server/mod.rs
index de37365b..fc40b017 100644
--- a/ballista/scheduler/src/scheduler_server/mod.rs
+++ b/ballista/scheduler/src/scheduler_server/mod.rs
@@ -40,6 +40,7 @@ use
crate::scheduler_server::query_stage_scheduler::QueryStageScheduler;
use crate::state::executor_manager::{
ExecutorManager, ExecutorReservation, DEFAULT_EXECUTOR_TIMEOUT_SECONDS,
+ EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS,
};
use crate::state::task_manager::TaskLauncher;
@@ -65,6 +66,7 @@ pub struct SchedulerServer<T: 'static + AsLogicalPlan, U:
'static + AsExecutionP
pub(crate) state: Arc<SchedulerState<T, U>>,
pub(crate) query_stage_event_loop: EventLoop<QueryStageSchedulerEvent>,
query_stage_scheduler: Arc<QueryStageScheduler<T, U>>,
+ executor_termination_grace_period: u64,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
SchedulerServer<T, U> {
@@ -98,6 +100,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
state,
query_stage_event_loop,
query_stage_scheduler,
+ executor_termination_grace_period:
config.executor_termination_grace_period,
}
}
@@ -134,6 +137,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
state,
query_stage_event_loop,
query_stage_scheduler,
+ executor_termination_grace_period:
config.executor_termination_grace_period,
}
}
@@ -216,83 +220,113 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
fn expire_dead_executors(&self) -> Result<()> {
let state = self.state.clone();
let event_sender = self.query_stage_event_loop.get_sender()?;
+ let termination_grace_period = self.executor_termination_grace_period;
tokio::task::spawn(async move {
loop {
- let expired_executors =
state.executor_manager.get_expired_executors();
+ let expired_executors = state
+ .executor_manager
+ .get_expired_executors(termination_grace_period);
for expired in expired_executors {
let executor_id = expired.executor_id.clone();
let executor_manager = state.executor_manager.clone();
- let stop_reason = format!(
- "Executor {} heartbeat timed out after {}s",
- executor_id.clone(),
- DEFAULT_EXECUTOR_TIMEOUT_SECONDS
- );
- warn!("{}", stop_reason.clone());
+
let sender_clone = event_sender.clone();
+
+ let terminating = matches!(
+ expired
+ .status
+ .as_ref()
+ .and_then(|status| status.status.as_ref()),
+
Some(ballista_core::serde::protobuf::executor_status::Status::Terminating(_))
+ );
+
+ let stop_reason = if terminating {
+ format!(
+ "TERMINATING executor {executor_id} heartbeat timed
out after {termination_grace_period}s"
+ )
+ } else {
+ format!(
+ "ACTIVE executor {executor_id} heartbeat timed out
after {DEFAULT_EXECUTOR_TIMEOUT_SECONDS}s",
+ )
+ };
+
+ warn!("{stop_reason}");
+
+ // If executor is expired, remove it immediately
Self::remove_executor(
executor_manager,
sender_clone,
&executor_id,
Some(stop_reason.clone()),
- )
- .await
- .unwrap_or_else(|e| {
- let msg =
- format!("Error to remove Executor in Scheduler due
to {e:?}");
- error!("{}", msg);
- });
+ 0,
+ );
- match
state.executor_manager.get_client(&executor_id).await {
- Ok(mut client) => {
- tokio::task::spawn(async move {
- match client
- .stop_executor(StopExecutorParams {
- executor_id,
- reason: stop_reason,
- force: true,
- })
- .await
- {
- Err(error) => {
- warn!(
+ // If executor is not already terminating then stop it. If
it is terminating then it should already be shutting
+ // down and we do not need to do anything here.
+ if !terminating {
+ match
state.executor_manager.get_client(&executor_id).await {
+ Ok(mut client) => {
+ tokio::task::spawn(async move {
+ match client
+ .stop_executor(StopExecutorParams {
+ executor_id,
+ reason: stop_reason,
+ force: true,
+ })
+ .await
+ {
+ Err(error) => {
+ warn!(
"Failed to send stop_executor rpc
due to, {}",
error
);
+ }
+ Ok(_value) => {}
}
- Ok(_value) => {}
- }
- });
- }
- Err(_) => {
- warn!("Executor is already dead, failed to connect
to Executor {}", executor_id);
+ });
+ }
+ Err(_) => {
+ warn!("Executor is already dead, failed to
connect to Executor {}", executor_id);
+ }
}
}
}
-
tokio::time::sleep(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS))
- .await;
+ tokio::time::sleep(Duration::from_secs(
+ EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS,
+ ))
+ .await;
}
});
Ok(())
}
- pub(crate) async fn remove_executor(
+ pub(crate) fn remove_executor(
executor_manager: ExecutorManager,
event_sender: EventSender<QueryStageSchedulerEvent>,
executor_id: &str,
reason: Option<String>,
- ) -> Result<()> {
- // Update the executor manager immediately here
- executor_manager
- .remove_executor(executor_id, reason.clone())
- .await?;
+ wait_secs: u64,
+ ) {
+ let executor_id = executor_id.to_owned();
+ tokio::spawn(async move {
+ // Wait for `wait_secs` before removing executor
+ tokio::time::sleep(Duration::from_secs(wait_secs)).await;
+
+ // Update the executor manager immediately here
+ if let Err(e) = executor_manager
+ .remove_executor(&executor_id, reason.clone())
+ .await
+ {
+ error!("error removing executor {executor_id}: {e:?}");
+ }
- event_sender
- .post_event(QueryStageSchedulerEvent::ExecutorLost(
- executor_id.to_owned(),
- reason,
- ))
- .await?;
- Ok(())
+ if let Err(e) = event_sender
+
.post_event(QueryStageSchedulerEvent::ExecutorLost(executor_id, reason))
+ .await
+ {
+ error!("error sending ExecutorLost event: {e:?}");
+ }
+ });
}
async fn do_register_executor(&self, metadata: ExecutorMetadata) ->
Result<()> {
diff --git a/ballista/scheduler/src/state/executor_manager.rs
b/ballista/scheduler/src/state/executor_manager.rs
index 9a51aa77..bec97c3b 100644
--- a/ballista/scheduler/src/state/executor_manager.rs
+++ b/ballista/scheduler/src/state/executor_manager.rs
@@ -85,6 +85,10 @@ impl ExecutorReservation {
/// to be dead.
pub const DEFAULT_EXECUTOR_TIMEOUT_SECONDS: u64 = 180;
+// TODO move to configuration file
+/// Interval check for expired or dead executors
+pub const EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS: u64 = 15;
+
#[derive(Clone)]
pub(crate) struct ExecutorManager {
// executor slot policy
@@ -140,14 +144,19 @@ impl ExecutorManager {
tokio::task::spawn(async move {
while let Some(heartbeat) = heartbeat_stream.next().await {
let executor_id = heartbeat.executor_id.clone();
- if let Some(ExecutorStatus {
- status: Some(executor_status::Status::Dead(_)),
- }) = heartbeat.status
+
+ match heartbeat
+ .status
+ .as_ref()
+ .and_then(|status| status.status.as_ref())
{
- heartbeats.remove(&executor_id);
- dead_executors.insert(executor_id);
- } else {
- heartbeats.insert(executor_id, heartbeat);
+ Some(executor_status::Status::Dead(_)) => {
+ heartbeats.remove(&executor_id);
+ dead_executors.insert(executor_id);
+ }
+ _ => {
+ heartbeats.insert(executor_id, heartbeat);
+ }
}
}
});
@@ -614,27 +623,62 @@ impl ExecutorManager {
.iter()
.filter_map(|pair| {
let (exec, heartbeat) = pair.pair();
- (heartbeat.timestamp > last_seen_ts_threshold).then(||
exec.clone())
+
+ let active = matches!(
+ heartbeat
+ .status
+ .as_ref()
+ .and_then(|status| status.status.as_ref()),
+ Some(executor_status::Status::Active(_))
+ );
+ let live = heartbeat.timestamp > last_seen_ts_threshold;
+
+ (active && live).then(|| exec.clone())
})
.collect()
}
/// Return a list of expired executors
- pub(crate) fn get_expired_executors(&self) -> Vec<ExecutorHeartbeat> {
+ pub(crate) fn get_expired_executors(
+ &self,
+ termination_grace_period: u64,
+ ) -> Vec<ExecutorHeartbeat> {
let now_epoch_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
+ // Threshold for last heartbeat from Active executor before marking
dead
let last_seen_threshold = now_epoch_ts
.checked_sub(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS))
.unwrap_or_else(|| Duration::from_secs(0))
.as_secs();
+ // Threshold for last heartbeat for Fenced executor before marking dead
+ let termination_wait_threshold = now_epoch_ts
+ .checked_sub(Duration::from_secs(termination_grace_period))
+ .unwrap_or_else(|| Duration::from_secs(0))
+ .as_secs();
+
let expired_executors = self
.executors_heartbeat
.iter()
.filter_map(|pair| {
let (_exec, heartbeat) = pair.pair();
- (heartbeat.timestamp <= last_seen_threshold).then(||
heartbeat.clone())
+
+ let terminating = matches!(
+ heartbeat
+ .status
+ .as_ref()
+ .and_then(|status| status.status.as_ref()),
+ Some(executor_status::Status::Terminating(_))
+ );
+
+ let grace_period_expired =
+ heartbeat.timestamp <= termination_wait_threshold;
+
+ let expired = heartbeat.timestamp <= last_seen_threshold;
+
+ ((terminating && grace_period_expired) || expired)
+ .then(|| heartbeat.clone())
})
.collect::<Vec<_>>();
expired_executors
@@ -656,9 +700,12 @@ mod test {
use crate::config::SlotsPolicy;
+ use crate::scheduler_server::timestamp_secs;
use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
use crate::test_utils::test_cluster_context;
use ballista_core::error::Result;
+ use ballista_core::serde::protobuf::executor_status::Status;
+ use ballista_core::serde::protobuf::{ExecutorHeartbeat, ExecutorStatus};
use ballista_core::serde::scheduler::{
ExecutorData, ExecutorMetadata, ExecutorSpecification,
};
@@ -844,6 +891,56 @@ mod test {
Ok(())
}
+ #[tokio::test]
+ async fn test_ignore_fenced_executors() -> Result<()> {
+ test_ignore_fenced_executors_inner(SlotsPolicy::Bias).await?;
+ test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobin).await?;
+
test_ignore_fenced_executors_inner(SlotsPolicy::RoundRobinLocal).await?;
+
+ Ok(())
+ }
+
+ async fn test_ignore_fenced_executors_inner(slots_policy: SlotsPolicy) ->
Result<()> {
+ let cluster = test_cluster_context();
+
+ let executor_manager =
+ ExecutorManager::new(cluster.cluster_state(), slots_policy);
+
+ // Setup two executors initially
+ let executors = test_executors(2, 4);
+
+ for (executor_metadata, executor_data) in executors {
+ let _ = executor_manager
+ .register_executor(executor_metadata, executor_data, false)
+ .await?;
+ }
+
+ // Fence one of the executors
+ executor_manager
+ .save_executor_heartbeat(ExecutorHeartbeat {
+ executor_id: "executor-0".to_string(),
+ timestamp: timestamp_secs(),
+ metrics: vec![],
+ status: Some(ExecutorStatus {
+ status: Some(Status::Terminating(String::default())),
+ }),
+ })
+ .await?;
+
+ let reservations = executor_manager.reserve_slots(8).await?;
+
+ assert_eq!(reservations.len(), 4, "Expected only four reservations");
+
+ assert!(
+ reservations
+ .iter()
+ .all(|res| res.executor_id == "executor-1"),
+ "Expected all reservations from non-fenced executor",
+ );
+
+ Ok(())
+ }
+
fn test_executors(
total_executors: usize,
slots_per_executor: u32,