mingmwang commented on code in PR #151:
URL: https://github.com/apache/arrow-ballista/pull/151#discussion_r949756356
##########
ballista/rust/executor/src/main.rs:
##########
@@ -154,57 +162,156 @@ async fn main() -> Result<()> {
let scheduler_policy = opt.task_scheduling_policy;
let cleanup_ttl = opt.executor_cleanup_ttl;
+ // Graceful shutdown notification
+ let graceful = Graceful::new();
+
if opt.executor_cleanup_enable {
let mut interval_time =
time::interval(Core_Duration::from_secs(opt.executor_cleanup_interval));
+ let mut shuffle_cleaner_shutdown = graceful.subscribe_for_shutdown();
+ // Not used directly.
+ let shuffle_cleaner_complete = graceful.shutdown_complete_tx.clone();
tokio::spawn(async move {
- loop {
- interval_time.tick().await;
- if let Err(e) =
- clean_shuffle_data_loop(&work_dir, cleanup_ttl as
i64).await
- {
- error!("Ballista executor fail to clean_shuffle_data
{:?}", e)
- }
+ // Drop and notifies the receiver half that the shutdown is
complete
+ let _shuffle_cleaner_complete = shuffle_cleaner_complete;
+ // As long as the shutdown notification has not been received
+ while !shuffle_cleaner_shutdown.is_shutdown() {
+ tokio::select! {
+ _ = interval_time.tick() => {
+ if let Err(e) = clean_shuffle_data_loop(&work_dir,
cleanup_ttl as i64).await
+ {
+ error!("Ballista executor fail to
clean_shuffle_data {:?}", e)
+ }
+ },
+ _ = shuffle_cleaner_shutdown.recv() => {
+ if let Err(e) = clean_all_shuffle_data(&work_dir).await
+ {
+ error!("Ballista executor fail to
clean_shuffle_data {:?}", e)
+ } else {
+ info!("Shuffle data cleaned.");
+ }
+ return;
+ }
+ };
}
});
}
+ let mut service_handlers: FuturesUnordered<JoinHandle<Result<(),
BallistaError>>> =
+ FuturesUnordered::new();
+
+ // Channels used to receive stop requests from Executor grpc service.
+ let (stop_send, mut stop_recv) = mpsc::channel::<bool>(10);
+
match scheduler_policy {
TaskSchedulingPolicy::PushStaged => {
- tokio::spawn(executor_server::startup(
- scheduler,
- executor.clone(),
- default_codec,
- ));
+ service_handlers.push(
+ //If there is executor registration error during startup,
return the error and stop early.
+ executor_server::startup(
+ scheduler,
+ executor.clone(),
+ default_codec,
+ stop_send,
+ &graceful,
+ )
+ .await?,
+ );
}
_ => {
- tokio::spawn(execution_loop::poll_loop(
+ service_handlers.push(tokio::spawn(execution_loop::poll_loop(
scheduler,
executor.clone(),
default_codec,
- ));
+ )));
}
+ };
+ service_handlers.push(tokio::spawn(flight_server_run(
+ addr,
+ graceful.subscribe_for_shutdown(),
+ )));
+
+ // Concurrently run the service checking and listen for the `shutdown`
signal and wait for the stop request coming.
+ // The check_services runs until an error is encountered, so under normal
circumstances, this `select!` statement runs
+ // until the `shutdown` signal is received or a stop request is coming.
+ tokio::select! {
+ service_val = check_services(&mut service_handlers) => {
+ info!("services stopped with reason {:?}", service_val);
+ },
+ _ = signal::ctrl_c() => {
Review Comment:
https://hyper.rs/guides/server/graceful-shutdown/
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]