mingmwang commented on code in PR #667:
URL: https://github.com/apache/arrow-ballista/pull/667#discussion_r1121086459
##########
ballista/scheduler/src/scheduler_server/mod.rs:
##########
@@ -216,83 +220,113 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerServer<T
fn expire_dead_executors(&self) -> Result<()> {
let state = self.state.clone();
let event_sender = self.query_stage_event_loop.get_sender()?;
+ let termination_grace_period = self.executor_termination_grace_period;
tokio::task::spawn(async move {
loop {
- let expired_executors =
state.executor_manager.get_expired_executors();
+ let expired_executors = state
+ .executor_manager
+ .get_expired_executors(termination_grace_period);
for expired in expired_executors {
let executor_id = expired.executor_id.clone();
let executor_manager = state.executor_manager.clone();
- let stop_reason = format!(
- "Executor {} heartbeat timed out after {}s",
- executor_id.clone(),
- DEFAULT_EXECUTOR_TIMEOUT_SECONDS
- );
- warn!("{}", stop_reason.clone());
+
let sender_clone = event_sender.clone();
+
+ let terminating = matches!(
+ expired
+ .status
+ .as_ref()
+ .and_then(|status| status.status.as_ref()),
+
Some(ballista_core::serde::protobuf::executor_status::Status::Terminating(_))
+ );
+
+ let stop_reason = if terminating {
+ format!(
+ "TERMINATING executor {executor_id} heartbeat timed
out after {termination_grace_period}s"
+ )
+ } else {
+ format!(
+ "ACTIVE executor {executor_id} heartbeat timed out
after {DEFAULT_EXECUTOR_TIMEOUT_SECONDS}s",
+ )
+ };
+
+ warn!("{stop_reason}");
+
+ // If executor is expired, remove it immediately
Self::remove_executor(
executor_manager,
sender_clone,
&executor_id,
Some(stop_reason.clone()),
- )
- .await
- .unwrap_or_else(|e| {
- let msg =
- format!("Error to remove Executor in Scheduler due
to {e:?}");
- error!("{}", msg);
- });
+ 0,
+ );
- match
state.executor_manager.get_client(&executor_id).await {
- Ok(mut client) => {
- tokio::task::spawn(async move {
- match client
- .stop_executor(StopExecutorParams {
- executor_id,
- reason: stop_reason,
- force: true,
- })
- .await
- {
- Err(error) => {
- warn!(
+ // If executor is not already terminating then stop it. If
it is terminating then it should already be shutting
+ // down and we do not need to do anything here.
+ if !terminating {
+ match
state.executor_manager.get_client(&executor_id).await {
+ Ok(mut client) => {
+ tokio::task::spawn(async move {
+ match client
+ .stop_executor(StopExecutorParams {
+ executor_id,
+ reason: stop_reason,
+ force: true,
+ })
+ .await
+ {
+ Err(error) => {
+ warn!(
"Failed to send stop_executor rpc
due to, {}",
error
);
+ }
+ Ok(_value) => {}
}
- Ok(_value) => {}
- }
- });
- }
- Err(_) => {
- warn!("Executor is already dead, failed to connect
to Executor {}", executor_id);
+ });
+ }
+ Err(_) => {
+ warn!("Executor is already dead, failed to
connect to Executor {}", executor_id);
+ }
}
}
}
-
tokio::time::sleep(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS))
- .await;
+ tokio::time::sleep(Duration::from_secs(
+ EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS,
+ ))
+ .await;
}
});
Ok(())
}
- pub(crate) async fn remove_executor(
+ pub(crate) fn remove_executor(
executor_manager: ExecutorManager,
event_sender: EventSender<QueryStageSchedulerEvent>,
executor_id: &str,
reason: Option<String>,
- ) -> Result<()> {
- // Update the executor manager immediately here
- executor_manager
- .remove_executor(executor_id, reason.clone())
- .await?;
+ wait_secs: u64,
+ ) {
+ let executor_id = executor_id.to_owned();
+ tokio::spawn(async move {
Review Comment:
I think if it the `wait_secs` = 0, there is no need to spawn.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]