avantgardnerio commented on code in PR #667:
URL: https://github.com/apache/arrow-ballista/pull/667#discussion_r1117683204
##########
ballista/scheduler/src/scheduler_server/mod.rs:
##########
@@ -216,83 +220,113 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
fn expire_dead_executors(&self) -> Result<()> {
let state = self.state.clone();
let event_sender = self.query_stage_event_loop.get_sender()?;
+ let termination_grace_period = self.executor_termination_grace_period;
tokio::task::spawn(async move {
loop {
- let expired_executors =
state.executor_manager.get_expired_executors();
+ let expired_executors = state
+ .executor_manager
+ .get_expired_executors(termination_grace_period);
for expired in expired_executors {
let executor_id = expired.executor_id.clone();
let executor_manager = state.executor_manager.clone();
- let stop_reason = format!(
- "Executor {} heartbeat timed out after {}s",
- executor_id.clone(),
- DEFAULT_EXECUTOR_TIMEOUT_SECONDS
- );
- warn!("{}", stop_reason.clone());
+
let sender_clone = event_sender.clone();
+
+ let terminating = matches!(
+ expired
+ .status
+ .as_ref()
+ .and_then(|status| status.status.as_ref()),
+
Some(ballista_core::serde::protobuf::executor_status::Status::Terminating(_))
+ );
+
+ let stop_reason = if terminating {
Review Comment:
Seems logical...
##########
ballista/scheduler/src/scheduler_server/grpc.rs:
##########
@@ -760,15 +763,22 @@ mod test {
.await
.expect("getting executor");
+ let is_stopped = await_condition(Duration::from_millis(10), 5, || {
+
futures::future::ready(Ok(state.executor_manager.is_dead_executor("abc")))
+ })
+ .await?;
+
// executor should be marked to dead
- assert!(state.executor_manager.is_dead_executor("abc"));
+ assert!(is_stopped, "Executor not marked dead after 50ms");
Review Comment:
Nice test
##########
ballista/scheduler/src/scheduler_server/mod.rs:
##########
@@ -216,83 +220,113 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
fn expire_dead_executors(&self) -> Result<()> {
let state = self.state.clone();
let event_sender = self.query_stage_event_loop.get_sender()?;
+ let termination_grace_period = self.executor_termination_grace_period;
tokio::task::spawn(async move {
loop {
- let expired_executors =
state.executor_manager.get_expired_executors();
+ let expired_executors = state
+ .executor_manager
+ .get_expired_executors(termination_grace_period);
for expired in expired_executors {
let executor_id = expired.executor_id.clone();
let executor_manager = state.executor_manager.clone();
- let stop_reason = format!(
- "Executor {} heartbeat timed out after {}s",
- executor_id.clone(),
- DEFAULT_EXECUTOR_TIMEOUT_SECONDS
- );
- warn!("{}", stop_reason.clone());
+
let sender_clone = event_sender.clone();
+
+ let terminating = matches!(
+ expired
+ .status
+ .as_ref()
+ .and_then(|status| status.status.as_ref()),
+
Some(ballista_core::serde::protobuf::executor_status::Status::Terminating(_))
+ );
+
+ let stop_reason = if terminating {
+ format!(
+ "TERMINATING executor {executor_id} heartbeat timed
out after {termination_grace_period}s"
+ )
+ } else {
+ format!(
+ "ACTIVE executor {executor_id} heartbeat timed out
after {DEFAULT_EXECUTOR_TIMEOUT_SECONDS}s",
+ )
+ };
+
+ warn!("{stop_reason}");
+
+ // If executor is expired, remove it immediately
Self::remove_executor(
executor_manager,
sender_clone,
&executor_id,
Some(stop_reason.clone()),
- )
- .await
- .unwrap_or_else(|e| {
- let msg =
- format!("Error to remove Executor in Scheduler due
to {e:?}");
- error!("{}", msg);
- });
+ 0,
+ );
- match
state.executor_manager.get_client(&executor_id).await {
- Ok(mut client) => {
- tokio::task::spawn(async move {
- match client
- .stop_executor(StopExecutorParams {
- executor_id,
- reason: stop_reason,
- force: true,
- })
- .await
- {
- Err(error) => {
- warn!(
+ // If executor is not already terminating then stop it. If
it is terminating then it should already be shutting
+ // down and we do not need to do anything here.
+ if !terminating {
+ match
state.executor_manager.get_client(&executor_id).await {
+ Ok(mut client) => {
+ tokio::task::spawn(async move {
+ match client
+ .stop_executor(StopExecutorParams {
+ executor_id,
+ reason: stop_reason,
+ force: true,
+ })
+ .await
+ {
+ Err(error) => {
+ warn!(
"Failed to send stop_executor rpc
due to, {}",
error
);
+ }
+ Ok(_value) => {}
}
- Ok(_value) => {}
- }
- });
- }
- Err(_) => {
- warn!("Executor is already dead, failed to connect
to Executor {}", executor_id);
+ });
+ }
+ Err(_) => {
+ warn!("Executor is already dead, failed to
connect to Executor {}", executor_id);
+ }
}
}
}
-
tokio::time::sleep(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS))
- .await;
+ tokio::time::sleep(Duration::from_secs(
+ EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS,
+ ))
+ .await;
}
});
Ok(())
}
- pub(crate) async fn remove_executor(
+ pub(crate) fn remove_executor(
executor_manager: ExecutorManager,
event_sender: EventSender<QueryStageSchedulerEvent>,
executor_id: &str,
reason: Option<String>,
- ) -> Result<()> {
- // Update the executor manager immediately here
- executor_manager
- .remove_executor(executor_id, reason.clone())
- .await?;
+ wait_secs: u64,
+ ) {
+ let executor_id = executor_id.to_owned();
+ tokio::spawn(async move {
Review Comment:
I was worried that spawning every time without backpressure from blocking or
an idempotency check could cause a storm of `ExecutorLost` events, but it seems
like this is handled by:
```
self.cluster_state.remove_executor(executor_id).await?;
...
self.executors_heartbeat.remove(&executor_id);
...
self.dead_executors.insert(executor_id);
```
?
It's not always easy to get a picture of the mental swim lane diagram of how
all these pieces of state, messages, and events interact.
##########
ballista/executor/src/executor_process.rs:
##########
@@ -319,7 +324,41 @@ pub async fn start_executor_process(opt:
ExecutorProcessConfig) -> Result<()> {
},
};
+ // Set status to fenced
+ info!("setting executor to TERMINATING status");
+ TERMINATING.store(true, Ordering::Release);
+
if notify_scheduler {
+ // Send a heartbeat to update status of executor to `Fenced`. This
should signal to the
+ // scheduler to no longer scheduler tasks on this executor
Review Comment:
```suggestion
// scheduler to no longer schedule tasks on this executor
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]