This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6b9b242988254d3adb28ff614b37d0874cf22ba7
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Feb 22 12:24:04 2019 +0100

    [FLINK-11718] Add onStart method to ResourceManager
---
 .../runtime/resourcemanager/ResourceManager.java   | 52 +++++++++++++++-------
 1 file changed, 35 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 396dd4e..957c991 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -202,31 +202,54 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
        // 
------------------------------------------------------------------------
 
        @Override
-       public void start() throws Exception {
-               // start a leader
-               super.start();
+       public void onStart() throws Exception {
+               try {
+                       startResourceManagerServices();
+               } catch (Exception e) {
+                       final ResourceManagerException exception = new 
ResourceManagerException(String.format("Could not start the ResourceManager 
%s", getAddress()), e);
+                       onFatalError(exception);
+                       throw exception;
+               }
+       }
 
-               leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
+       private void startResourceManagerServices() throws Exception {
+               try {
+                       leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
 
-               initialize();
+                       initialize();
 
-               try {
                        leaderElectionService.start(this);
+                       jobLeaderIdService.start(new JobLeaderIdActionsImpl());
+
+                       registerSlotAndTaskExecutorMetrics();
                } catch (Exception e) {
-                       throw new ResourceManagerException("Could not start the 
leader election service.", e);
+                       handleStartResourceManagerServicesException(e);
                }
+       }
 
+       private void handleStartResourceManagerServicesException(Exception e) 
throws Exception {
                try {
-                       jobLeaderIdService.start(new JobLeaderIdActionsImpl());
-               } catch (Exception e) {
-                       throw new ResourceManagerException("Could not start the 
job leader id service.", e);
+                       stopResourceManagerServices();
+               } catch (Exception inner) {
+                       e.addSuppressed(inner);
                }
 
-               registerSlotAndTaskExecutorMetrics();
+               throw e;
        }
 
        @Override
        public CompletableFuture<Void> onStop() {
+               try {
+                       stopResourceManagerServices();
+               } catch (Exception exception) {
+                       return FutureUtils.completedExceptionally(
+                               new FlinkException("Could not properly shut 
down the ResourceManager.", exception));
+               }
+
+               return CompletableFuture.completedFuture(null);
+       }
+
+       private void stopResourceManagerServices() throws Exception {
                Exception exception = null;
 
                taskManagerHeartbeatManager.stop();
@@ -253,12 +276,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
 
                clearStateInternal();
 
-               if (exception != null) {
-                       return FutureUtils.completedExceptionally(
-                               new FlinkException("Could not properly shut 
down the ResourceManager.", exception));
-               } else {
-                       return CompletableFuture.completedFuture(null);
-               }
+               ExceptionUtils.tryRethrowException(exception);
        }
 
        // 
------------------------------------------------------------------------

Reply via email to