mingmwang commented on code in PR #667:
URL: https://github.com/apache/arrow-ballista/pull/667#discussion_r1121086459


##########
ballista/scheduler/src/scheduler_server/mod.rs:
##########
@@ -216,83 +220,113 @@ impl<T: 'static + AsLogicalPlan, U: 'static + 
AsExecutionPlan> SchedulerServer<T
     fn expire_dead_executors(&self) -> Result<()> {
         let state = self.state.clone();
         let event_sender = self.query_stage_event_loop.get_sender()?;
+        let termination_grace_period = self.executor_termination_grace_period;
         tokio::task::spawn(async move {
             loop {
-                let expired_executors = 
state.executor_manager.get_expired_executors();
+                let expired_executors = state
+                    .executor_manager
+                    .get_expired_executors(termination_grace_period);
                 for expired in expired_executors {
                     let executor_id = expired.executor_id.clone();
                     let executor_manager = state.executor_manager.clone();
-                    let stop_reason = format!(
-                        "Executor {} heartbeat timed out after {}s",
-                        executor_id.clone(),
-                        DEFAULT_EXECUTOR_TIMEOUT_SECONDS
-                    );
-                    warn!("{}", stop_reason.clone());
+
                     let sender_clone = event_sender.clone();
+
+                    let terminating = matches!(
+                        expired
+                            .status
+                            .as_ref()
+                            .and_then(|status| status.status.as_ref()),
+                        
Some(ballista_core::serde::protobuf::executor_status::Status::Terminating(_))
+                    );
+
+                    let stop_reason = if terminating {
+                        format!(
+                        "TERMINATING executor {executor_id} heartbeat timed 
out after {termination_grace_period}s"
+                    )
+                    } else {
+                        format!(
+                            "ACTIVE executor {executor_id} heartbeat timed out 
after {DEFAULT_EXECUTOR_TIMEOUT_SECONDS}s",
+                        )
+                    };
+
+                    warn!("{stop_reason}");
+
+                    // If executor is expired, remove it immediately
                     Self::remove_executor(
                         executor_manager,
                         sender_clone,
                         &executor_id,
                         Some(stop_reason.clone()),
-                    )
-                    .await
-                    .unwrap_or_else(|e| {
-                        let msg =
-                            format!("Error to remove Executor in Scheduler due 
to {e:?}");
-                        error!("{}", msg);
-                    });
+                        0,
+                    );
 
-                    match 
state.executor_manager.get_client(&executor_id).await {
-                        Ok(mut client) => {
-                            tokio::task::spawn(async move {
-                                match client
-                                    .stop_executor(StopExecutorParams {
-                                        executor_id,
-                                        reason: stop_reason,
-                                        force: true,
-                                    })
-                                    .await
-                                {
-                                    Err(error) => {
-                                        warn!(
+                    // If executor is not already terminating then stop it. If 
it is terminating then it should already be shutting
+                    // down and we do not need to do anything here.
+                    if !terminating {
+                        match 
state.executor_manager.get_client(&executor_id).await {
+                            Ok(mut client) => {
+                                tokio::task::spawn(async move {
+                                    match client
+                                        .stop_executor(StopExecutorParams {
+                                            executor_id,
+                                            reason: stop_reason,
+                                            force: true,
+                                        })
+                                        .await
+                                    {
+                                        Err(error) => {
+                                            warn!(
                                             "Failed to send stop_executor rpc 
due to, {}",
                                             error
                                         );
+                                        }
+                                        Ok(_value) => {}
                                     }
-                                    Ok(_value) => {}
-                                }
-                            });
-                        }
-                        Err(_) => {
-                            warn!("Executor is already dead, failed to connect 
to Executor {}", executor_id);
+                                });
+                            }
+                            Err(_) => {
+                                warn!("Executor is already dead, failed to 
connect to Executor {}", executor_id);
+                            }
                         }
                     }
                 }
-                
tokio::time::sleep(Duration::from_secs(DEFAULT_EXECUTOR_TIMEOUT_SECONDS))
-                    .await;
+                tokio::time::sleep(Duration::from_secs(
+                    EXPIRE_DEAD_EXECUTOR_INTERVAL_SECS,
+                ))
+                .await;
             }
         });
         Ok(())
     }
 
-    pub(crate) async fn remove_executor(
+    pub(crate) fn remove_executor(
         executor_manager: ExecutorManager,
         event_sender: EventSender<QueryStageSchedulerEvent>,
         executor_id: &str,
         reason: Option<String>,
-    ) -> Result<()> {
-        // Update the executor manager immediately here
-        executor_manager
-            .remove_executor(executor_id, reason.clone())
-            .await?;
+        wait_secs: u64,
+    ) {
+        let executor_id = executor_id.to_owned();
+        tokio::spawn(async move {

Review Comment:
   I think if it the `wait_secs` = 0, there is no need to spawn.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to