[ 
https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16581063#comment-16581063
 ] 

ASF GitHub Bot commented on FLINK-9936:
---------------------------------------

tillrohrmann closed pull request #6451: [FLINK-9936] Resource manager connect 
to mesos after leadership granted. .
URL: https://github.com/apache/flink/pull/6451
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index e24214d28c1..b2606e46fbc 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -221,10 +221,14 @@ protected ActorRef 
createReconciliationCoordinator(SchedulerDriver schedulerDriv
        // 
------------------------------------------------------------------------
 
        /**
-        * Starts the Mesos-specifics.
+        * Do nothing and all work has been moved to on leadership granted 
callback.
         */
        @Override
        protected void initialize() throws ResourceManagerException {
+       }
+
+       @Override
+       protected void onLeaderShipGranted() throws Exception {
                // create and start the worker store
                try {
                        this.workerStore = 
mesosServices.createMesosWorkerStore(flinkConfig, 
getRpcService().getExecutor());
@@ -283,7 +287,14 @@ protected void initialize() throws 
ResourceManagerException {
                connectionMonitor.tell(new ConnectionMonitor.Start(), 
selfActor);
                schedulerDriver.start();
 
-               LOG.info("Mesos resource manager initialized.");
+               LOG.info("Mesos resource manager started.");
+       }
+
+       @Override
+       protected void onLeaderShipRevoked() throws Exception {
+               workerStore.stop(false);
+               schedulerDriver.stop(true);
+               disconnected(new Disconnected());
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a992632b666..081da27424d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -899,6 +899,12 @@ public void grantLeadership(final UUID newLeaderSessionID) 
{
 
                                setFencingToken(newResourceManagerId);
 
+                               try {
+                                       onLeaderShipGranted();
+                               } catch (Exception e) {
+                                       onFatalError(e);
+                               }
+
                                slotManager.start(getFencingToken(), 
getMainThreadExecutor(), new ResourceActionsImpl());
 
                                getRpcService().execute(
@@ -919,6 +925,12 @@ public void revokeLeadership() {
 
                                clearState();
 
+                               try {
+                                       onLeaderShipRevoked();
+                               } catch (Exception e) {
+                                       onFatalError(e);
+                               }
+
                                setFencingToken(null);
 
                                slotManager.suspend();
@@ -946,6 +958,18 @@ public void handleError(final Exception exception) {
         */
        protected abstract void initialize() throws ResourceManagerException;
 
+       /**
+        * Called when leadership is granted.
+        * @throws Exception which occurs during granting leadership and causes 
the resource manager to fail.
+        */
+       protected void onLeaderShipGranted() throws Exception {}
+
+       /**
+        * Called when leadership is revoked.
+        * @throws Exception which occurs during revoking leadership and causes 
the resource manager to fail.
+        */
+       protected void onLeaderShipRevoked() throws Exception {}
+
        /**
         * The framework specific code to deregister the application. This 
should report the
         * application's final status and shut down the resource manager 
cleanly.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Mesos resource manager unable to connect to master after failover
> -----------------------------------------------------------------
>
>                 Key: FLINK-9936
>                 URL: https://issues.apache.org/jira/browse/FLINK-9936
>             Project: Flink
>          Issue Type: Bug
>          Components: Mesos, Scheduler
>    Affects Versions: 1.5.0, 1.5.1, 1.6.0
>            Reporter: Renjie Liu
>            Assignee: Gary Yao
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.5.3, 1.6.0
>
>
> When deployed in mesos session cluster mode, the connector monitor keeps 
> reporting unable to connect to mesos after restart. In fact, scheduler driver 
> already connected to mesos master, but when the connected message is lost. 
> This is because leadership is not granted yet and fence id is not set, the 
> rpc service ignores the connected message. So we should connect to mesos 
> master after leadership is granted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to