This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6b9b242988254d3adb28ff614b37d0874cf22ba7 Author: Till Rohrmann <[email protected]> AuthorDate: Fri Feb 22 12:24:04 2019 +0100 [FLINK-11718] Add onStart method to ResourceManager --- .../runtime/resourcemanager/ResourceManager.java | 52 +++++++++++++++------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 396dd4e..957c991 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -202,31 +202,54 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> // ------------------------------------------------------------------------ @Override - public void start() throws Exception { - // start a leader - super.start(); + public void onStart() throws Exception { + try { + startResourceManagerServices(); + } catch (Exception e) { + final ResourceManagerException exception = new ResourceManagerException(String.format("Could not start the ResourceManager %s", getAddress()), e); + onFatalError(exception); + throw exception; + } + } - leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService(); + private void startResourceManagerServices() throws Exception { + try { + leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService(); - initialize(); + initialize(); - try { leaderElectionService.start(this); + jobLeaderIdService.start(new JobLeaderIdActionsImpl()); + + registerSlotAndTaskExecutorMetrics(); } catch (Exception e) { - throw new ResourceManagerException("Could not start the leader election service.", e); + handleStartResourceManagerServicesException(e); } + } + private void handleStartResourceManagerServicesException(Exception e) throws Exception { try { - jobLeaderIdService.start(new JobLeaderIdActionsImpl()); - } catch (Exception e) { - throw new ResourceManagerException("Could not start the job leader id service.", e); + stopResourceManagerServices(); + } catch (Exception inner) { + e.addSuppressed(inner); } - registerSlotAndTaskExecutorMetrics(); + throw e; } @Override public CompletableFuture<Void> onStop() { + try { + stopResourceManagerServices(); + } catch (Exception exception) { + return FutureUtils.completedExceptionally( + new FlinkException("Could not properly shut down the ResourceManager.", exception)); + } + + return CompletableFuture.completedFuture(null); + } + + private void stopResourceManagerServices() throws Exception { Exception exception = null; taskManagerHeartbeatManager.stop(); @@ -253,12 +276,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> clearStateInternal(); - if (exception != null) { - return FutureUtils.completedExceptionally( - new FlinkException("Could not properly shut down the ResourceManager.", exception)); - } else { - return CompletableFuture.completedFuture(null); - } + ExceptionUtils.tryRethrowException(exception); } // ------------------------------------------------------------------------
