[ 
https://issues.apache.org/jira/browse/FLINK-7092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16104886#comment-16104886
 ] 

ASF GitHub Bot commented on FLINK-7092:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4289#discussion_r130078259
  
    --- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ---
    @@ -312,6 +318,114 @@ private void recoverWorkers() throws Exception {
        }
     
        @Override
    +   public void shutDown() throws Exception {
    +           // shut down all components
    +           Future<Boolean> stopTaskMonitorFuture = null;
    +           Future<Boolean> stopConnectionMonitorFuture = null;
    +           Future<Boolean> stopLaunchCoordinatorFuture = null;
    +           Future<Boolean> stopReconciliationCoordinatorFuture = null;
    +
    +           FiniteDuration stopTimeout = null;
    +
    +           Exception exception = null;
    +
    +           if (taskMonitor != null) {
    +                   stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
    +                   stopTaskMonitorFuture = 
Patterns.gracefulStop(taskMonitor, stopTimeout);
    +           }
    +
    +           if (stopTaskMonitorFuture != null) {
    +                   boolean stopped = false;
    +
    +                   try {
    +                           stopped = Await.result(stopTaskMonitorFuture, 
stopTimeout);
    +                   } catch (Exception ex) {
    +                           exception = new Exception("TaskMonitor actor 
did not properly stop.", ex);
    +                   }
    +
    +                   if (!stopped) {
    +                           // the taskMonitor actor did not stop in time, 
let's kill him
    +                           taskMonitor.tell(Kill.getInstance(), 
ActorRef.noSender());
    +                   }
    +           }
    +
    +           if (connectionMonitor != null) {
    +                   stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
    +                   stopConnectionMonitorFuture = 
Patterns.gracefulStop(connectionMonitor, stopTimeout);
    +           }
    +
    +           if (stopConnectionMonitorFuture != null) {
    +                   boolean stopped = false;
    +
    +                   try {
    +                           stopped = 
Await.result(stopConnectionMonitorFuture, stopTimeout);
    +                   } catch (Exception ex) {
    +                           exception = ExceptionUtils.firstOrSuppressed(
    +                                   new Exception("ConnectionMonitor actor 
did not properly stop.", ex),
    +                                   exception
    +                           );
    +                   }
    +
    +                   if (!stopped) {
    +                           // the connectionMonitor actor did not stop in 
time, let's kill him
    +                           connectionMonitor.tell(Kill.getInstance(), 
ActorRef.noSender());
    +                   }
    +           }
    +
    +           if (launchCoordinator != null) {
    +                   stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
    +                   stopLaunchCoordinatorFuture = 
Patterns.gracefulStop(launchCoordinator, stopTimeout);
    +           }
    +
    +           if (stopLaunchCoordinatorFuture != null) {
    +                   boolean stopped = false;
    +
    +                   try {
    +                           stopped = 
Await.result(stopLaunchCoordinatorFuture, stopTimeout);
    +                   } catch (Exception ex) {
    +                           exception = ExceptionUtils.firstOrSuppressed(
    +                                   new Exception("LaunchCoordinator actor 
did not properly stop.", ex),
    +                                   exception
    +                           );
    +                   }
    +
    +                   if (!stopped) {
    +                           // the launchCoordinator actor did not stop in 
time, let's kill him
    +                           launchCoordinator.tell(Kill.getInstance(), 
ActorRef.noSender());
    +                   }
    +           }
    +
    +           if (reconciliationCoordinator != null) {
    +                   stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
    +                   stopReconciliationCoordinatorFuture = 
Patterns.gracefulStop(reconciliationCoordinator, stopTimeout);
    +           }
    +
    +           if (stopReconciliationCoordinatorFuture != null) {
    +                   boolean stopped = false;
    +
    +                   try {
    +                           stopped = 
Await.result(stopReconciliationCoordinatorFuture, stopTimeout);
    +                   } catch (Exception ex) {
    +                           exception = ExceptionUtils.firstOrSuppressed(
    +                                   new 
Exception("ReconciliationCoordinator actor did not properly stop.", ex),
    +                                   exception
    +                           );
    +                   }
    +
    +                   if (!stopped) {
    +                           // the reconciliationCoordinator actor did not 
stop in time, let's kill him
    +                           
reconciliationCoordinator.tell(Kill.getInstance(), ActorRef.noSender());
    +                   }
    --- End diff --
    
    I think we can combine the different futures into a single future to wait 
on the shut down concurrently.


> Shutdown ResourceManager components properly
> --------------------------------------------
>
>                 Key: FLINK-7092
>                 URL: https://issues.apache.org/jira/browse/FLINK-7092
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Mesos
>            Reporter: Till Rohrmann
>            Assignee: mingleizhang
>            Priority: Minor
>              Labels: flip-6
>
> The {{MesosResourceManager}} starts internally a {{TaskMonitor}}, 
> {{LaunchCoordinator}}, {{ConnectionMonitor}} and a 
> {{ReconciliationCoordinator}}. These components have to be properly shut down 
> when the {{MesosResourceManager}} closes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to