thinkharderdev commented on code in PR #151:
URL: https://github.com/apache/arrow-ballista/pull/151#discussion_r948965815
##########
ballista/rust/executor/src/execution_loop.rs:
##########
@@ -108,7 +107,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U:
'static + AsExecutionPlan>
}
}
Err(error) => {
- warn!("Executor registration failed. If this continues to
happen the executor might be marked as dead by the scheduler. Error: {}",
error);
+ warn!("Executor poll work loop failed. If this continues to
happen the Scheduler might be marked as dead. Error: {}", error);
Review Comment:
Not sure the second edit makes sense. Why would the scheduler be marked as
dead?
##########
ballista/rust/executor/src/main.rs:
##########
@@ -154,57 +162,156 @@ async fn main() -> Result<()> {
let scheduler_policy = opt.task_scheduling_policy;
let cleanup_ttl = opt.executor_cleanup_ttl;
+ // Graceful shutdown notification
+ let graceful = Graceful::new();
+
if opt.executor_cleanup_enable {
let mut interval_time =
time::interval(Core_Duration::from_secs(opt.executor_cleanup_interval));
+ let mut shuffle_cleaner_shutdown = graceful.subscribe_for_shutdown();
+ // Not used directly.
+ let shuffle_cleaner_complete = graceful.shutdown_complete_tx.clone();
tokio::spawn(async move {
- loop {
- interval_time.tick().await;
- if let Err(e) =
- clean_shuffle_data_loop(&work_dir, cleanup_ttl as
i64).await
- {
- error!("Ballista executor fail to clean_shuffle_data
{:?}", e)
- }
+ // Drop and notifies the receiver half that the shutdown is
complete
+ let _shuffle_cleaner_complete = shuffle_cleaner_complete;
+ // As long as the shutdown notification has not been received
+ while !shuffle_cleaner_shutdown.is_shutdown() {
+ tokio::select! {
+ _ = interval_time.tick() => {
+ if let Err(e) = clean_shuffle_data_loop(&work_dir,
cleanup_ttl as i64).await
+ {
+ error!("Ballista executor fail to
clean_shuffle_data {:?}", e)
+ }
+ },
+ _ = shuffle_cleaner_shutdown.recv() => {
+ if let Err(e) = clean_all_shuffle_data(&work_dir).await
+ {
+ error!("Ballista executor fail to
clean_shuffle_data {:?}", e)
+ } else {
+ info!("Shuffle data cleaned.");
+ }
+ return;
+ }
+ };
}
});
}
+ let mut service_handlers: FuturesUnordered<JoinHandle<Result<(),
BallistaError>>> =
+ FuturesUnordered::new();
+
+ // Channels used to receive stop requests from Executor grpc service.
+ let (stop_send, mut stop_recv) = mpsc::channel::<bool>(10);
+
match scheduler_policy {
TaskSchedulingPolicy::PushStaged => {
- tokio::spawn(executor_server::startup(
- scheduler,
- executor.clone(),
- default_codec,
- ));
+ service_handlers.push(
+ //If there is executor registration error during startup,
return the error and stop early.
+ executor_server::startup(
+ scheduler,
+ executor.clone(),
+ default_codec,
+ stop_send,
+ &graceful,
+ )
+ .await?,
+ );
}
_ => {
- tokio::spawn(execution_loop::poll_loop(
+ service_handlers.push(tokio::spawn(execution_loop::poll_loop(
Review Comment:
Why does the poll loop not listen for shutdown signals?
##########
ballista/rust/executor/src/executor_server.rs:
##########
@@ -368,16 +425,31 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskRunnerPool<T,
//2. loop for task fetching and running
let executor_server = self.executor_server.clone();
+ let mut task_runner_shutdown = graceful.subscribe_for_shutdown();
+ // Not used directly.
+ let task_runner_complete = graceful.shutdown_complete_tx.clone();
tokio::spawn(async move {
info!("Starting the task runner pool");
+
+ // Drop and notifies the receiver half that the shutdown is
complete
Review Comment:
```suggestion
// Capture `task_runner_complete`. This will be dropped when
this background task returns, indicating that shutdown is complete
```
##########
ballista/rust/executor/src/executor_server.rs:
##########
@@ -310,25 +354,38 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskRunnerPool<T,
Self { executor_server }
}
- // There are two loop(future) running separately in tokio runtime.
- // First is for sending back task status to scheduler
- // Second is for receiving task from scheduler and run
- async fn start(
+ fn start(
&self,
mut rx_task: mpsc::Receiver<TaskDefinition>,
mut rx_task_status: mpsc::Receiver<TaskStatus>,
+ graceful: &Graceful,
) {
//1. loop for task status reporting
let executor_server = self.executor_server.clone();
+ let mut tasks_status_shutdown = graceful.subscribe_for_shutdown();
+ // Not used directly.
+ let tasks_status_complete = graceful.shutdown_complete_tx.clone();
tokio::spawn(async move {
info!("Starting the task status reporter");
- loop {
+
+ // Drop and notifies the receiver half that the shutdown is
complete
Review Comment:
```suggestion
// Capture `task_status_complete`. This will be dropped when
this background task returns, indicating that shutdown is complete
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]