mingmwang commented on code in PR #151:
URL: https://github.com/apache/arrow-ballista/pull/151#discussion_r949744647
##########
ballista/rust/executor/src/main.rs:
##########
@@ -154,57 +162,156 @@ async fn main() -> Result<()> {
let scheduler_policy = opt.task_scheduling_policy;
let cleanup_ttl = opt.executor_cleanup_ttl;
+ // Graceful shutdown notification
+ let graceful = Graceful::new();
+
if opt.executor_cleanup_enable {
let mut interval_time =
time::interval(Core_Duration::from_secs(opt.executor_cleanup_interval));
+ let mut shuffle_cleaner_shutdown = graceful.subscribe_for_shutdown();
+ // Not used directly.
+ let shuffle_cleaner_complete = graceful.shutdown_complete_tx.clone();
tokio::spawn(async move {
- loop {
- interval_time.tick().await;
- if let Err(e) =
- clean_shuffle_data_loop(&work_dir, cleanup_ttl as
i64).await
- {
- error!("Ballista executor fail to clean_shuffle_data
{:?}", e)
- }
+ // Drop and notifies the receiver half that the shutdown is
complete
+ let _shuffle_cleaner_complete = shuffle_cleaner_complete;
+ // As long as the shutdown notification has not been received
+ while !shuffle_cleaner_shutdown.is_shutdown() {
+ tokio::select! {
+ _ = interval_time.tick() => {
+ if let Err(e) = clean_shuffle_data_loop(&work_dir,
cleanup_ttl as i64).await
+ {
+ error!("Ballista executor fail to
clean_shuffle_data {:?}", e)
+ }
+ },
+ _ = shuffle_cleaner_shutdown.recv() => {
+ if let Err(e) = clean_all_shuffle_data(&work_dir).await
+ {
+ error!("Ballista executor fail to
clean_shuffle_data {:?}", e)
+ } else {
+ info!("Shuffle data cleaned.");
+ }
+ return;
+ }
+ };
}
});
}
+ let mut service_handlers: FuturesUnordered<JoinHandle<Result<(),
BallistaError>>> =
+ FuturesUnordered::new();
+
+ // Channels used to receive stop requests from Executor grpc service.
+ let (stop_send, mut stop_recv) = mpsc::channel::<bool>(10);
+
match scheduler_policy {
TaskSchedulingPolicy::PushStaged => {
- tokio::spawn(executor_server::startup(
- scheduler,
- executor.clone(),
- default_codec,
- ));
+ service_handlers.push(
+ //If there is executor registration error during startup,
return the error and stop early.
+ executor_server::startup(
+ scheduler,
+ executor.clone(),
+ default_codec,
+ stop_send,
+ &graceful,
+ )
+ .await?,
+ );
}
_ => {
- tokio::spawn(execution_loop::poll_loop(
+ service_handlers.push(tokio::spawn(execution_loop::poll_loop(
Review Comment:
Good catch. The reason I didn't add the shutdown logic here is just because
this poll loop is also used by standalone Executor, I'm not sure whether a
standalone Executor need graceful shutdown or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]