[FLINK-9408] Let JM try to reconnect to RM

This commit changes the behaviour of the JM to always try to reconnect to the 
latest known RM address.

This closes #6056.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06437970
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06437970
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06437970

Branch: refs/heads/master
Commit: 064379705f36aaea927c44bb303a867c0c66265d
Parents: 9ad868c
Author: Till Rohrmann <[email protected]>
Authored: Tue May 22 15:53:38 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Wed May 23 00:58:57 2018 +0200

----------------------------------------------------------------------
 .../EstablishedResourceManagerConnection.java   | 13 +--
 .../flink/runtime/jobmaster/JobMaster.java      | 91 ++++++++++++--------
 .../jobmaster/ResourceManagerAddress.java       | 77 +++++++++++++++++
 .../flink/runtime/jobmaster/JobMasterTest.java  | 90 +++++++++++++++++++
 4 files changed, 224 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06437970/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
index 46c1b4b..e64754d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 
 import javax.annotation.Nonnull;
 
@@ -30,29 +29,25 @@ import javax.annotation.Nonnull;
  */
 class EstablishedResourceManagerConnection {
 
+       @Nonnull
        private final ResourceManagerGateway resourceManagerGateway;
 
-       private final ResourceManagerId resourceManagerId;
-
+       @Nonnull
        private final ResourceID resourceManagerResourceID;
 
        EstablishedResourceManagerConnection(
                        @Nonnull ResourceManagerGateway resourceManagerGateway,
-                       @Nonnull ResourceManagerId resourceManagerId,
                        @Nonnull ResourceID resourceManagerResourceID) {
                this.resourceManagerGateway = resourceManagerGateway;
-               this.resourceManagerId = resourceManagerId;
                this.resourceManagerResourceID = resourceManagerResourceID;
        }
 
+       @Nonnull
        public ResourceManagerGateway getResourceManagerGateway() {
                return resourceManagerGateway;
        }
 
-       public ResourceManagerId getResourceManagerId() {
-               return resourceManagerId;
-       }
-
+       @Nonnull
        public ResourceID getResourceManagerResourceID() {
                return resourceManagerResourceID;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/06437970/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 5f5a9a9..1df9d89 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -131,6 +131,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * JobMaster implementation. The job master is responsible for the execution 
of a single
@@ -209,6 +210,9 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        private String lastInternalSavepoint;
 
        @Nullable
+       private ResourceManagerAddress resourceManagerAddress;
+
+       @Nullable
        private ResourceManagerConnection resourceManagerConnection;
 
        @Nullable
@@ -888,13 +892,13 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        final Exception cause) {
 
                if (isConnectingToResourceManager(resourceManagerId)) {
-                       closeResourceManagerConnection(cause);
+                       reconnectToResourceManager(cause);
                }
        }
 
        private boolean isConnectingToResourceManager(ResourceManagerId 
resourceManagerId) {
-               return resourceManagerConnection != null
-                               && 
resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId);
+               return resourceManagerAddress != null
+                               && 
resourceManagerAddress.getResourceManagerId().equals(resourceManagerId);
        }
 
        @Override
@@ -999,9 +1003,10 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                setNewFencingToken(newJobMasterId);
 
+               startJobMasterServices();
+
                log.info("Starting execution of job {} ({})", 
jobGraph.getName(), jobGraph.getJobID());
 
-               startJobMasterServices();
                resetAndScheduleExecutionGraph();
 
                return Acknowledge.get();
@@ -1011,6 +1016,10 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                // start the slot pool make sure the slot pool now accepts 
messages for this leader
                slotPool.start(getFencingToken(), getAddress());
 
+               //TODO: Remove once the ZooKeeperLeaderRetrieval returns the 
stored address upon start
+               // try to reconnect to previously known leader
+               reconnectToResourceManager(new FlinkException("Starting 
JobMaster component."));
+
                // job is ready to go, try to establish connection with 
resource manager
                //   - activate leader retrieval for the resource manager
                //   - on notification of the leader, the connection will be 
established and
@@ -1072,8 +1081,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        ExecutionGraph newExecutionGraph,
                        JobManagerJobMetricGroup newJobManagerJobMetricGroup) {
                validateRunsInMainThread();
-               
Preconditions.checkState(executionGraph.getState().isTerminalState());
-               Preconditions.checkState(jobManagerJobMetricGroup == null);
+               checkState(executionGraph.getState().isTerminalState());
+               checkState(jobManagerJobMetricGroup == null);
 
                executionGraph = newExecutionGraph;
                jobManagerJobMetricGroup = newJobManagerJobMetricGroup;
@@ -1103,7 +1112,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        }
 
        private void scheduleExecutionGraph() {
-               Preconditions.checkState(jobStatusListener == null);
+               checkState(jobStatusListener == null);
                // register self as job status change listener
                jobStatusListener = new JobManagerJobStatusListener();
                executionGraph.registerJobStatusListener(jobStatusListener);
@@ -1239,33 +1248,40 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                }
        }
 
-       private void notifyOfNewResourceManagerLeader(final String 
resourceManagerAddress, final ResourceManagerId resourceManagerId) {
-               if (resourceManagerConnection != null) {
-                       if (resourceManagerAddress != null) {
-                               if (Objects.equals(resourceManagerAddress, 
resourceManagerConnection.getTargetAddress())
-                                       && Objects.equals(resourceManagerId, 
resourceManagerConnection.getTargetLeaderId())) {
-                                       // both address and leader id are not 
changed, we can keep the old connection
-                                       return;
-                               }
+       private void notifyOfNewResourceManagerLeader(final String 
newResourceManagerAddress, final ResourceManagerId resourceManagerId) {
+               resourceManagerAddress = 
createResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
 
-                               log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
-                                       
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+               reconnectToResourceManager(new 
FlinkException(String.format("ResourceManager leader changed to new address 
%s", resourceManagerAddress)));
+       }
 
-                               closeResourceManagerConnection(new Exception(
-                                       "ResourceManager leader changed to new 
address " + resourceManagerAddress));
-                       } else {
-                               log.info("Current ResourceManager {} lost 
leader status. Waiting for new ResourceManager leader.",
-                                       
resourceManagerConnection.getTargetAddress());
-                       }
+       @Nullable
+       private ResourceManagerAddress createResourceManagerAddress(@Nullable 
String newResourceManagerAddress, @Nullable ResourceManagerId 
resourceManagerId) {
+               if (newResourceManagerAddress != null) {
+                       // the contract is: address == null <=> id == null
+                       checkNotNull(resourceManagerId);
+                       return new 
ResourceManagerAddress(newResourceManagerAddress, resourceManagerId);
+               } else {
+                       return null;
                }
+       }
+
+       private void reconnectToResourceManager(Exception cause) {
+               closeResourceManagerConnection(cause);
+               tryConnectToResourceManager();
+       }
 
+       private void tryConnectToResourceManager() {
                if (resourceManagerAddress != null) {
-                       createResourceManagerConnection(resourceManagerAddress, 
resourceManagerId);
+                       connectToResourceManager();
                }
        }
 
-       private void createResourceManagerConnection(String 
resourceManagerAddress, ResourceManagerId resourceManagerId) {
-               log.info("Attempting to register at ResourceManager {}", 
resourceManagerAddress);
+       private void connectToResourceManager() {
+               assert(resourceManagerAddress != null);
+               assert(resourceManagerConnection == null);
+               assert(establishedResourceManagerConnection == null);
+
+               log.info("Connecting to ResourceManager {}", 
resourceManagerAddress);
 
                resourceManagerConnection = new ResourceManagerConnection(
                        log,
@@ -1273,8 +1289,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        resourceId,
                        getAddress(),
                        getFencingToken(),
-                       resourceManagerAddress,
-                       resourceManagerId,
+                       resourceManagerAddress.getAddress(),
+                       resourceManagerAddress.getResourceManagerId(),
                        scheduledExecutorService);
 
                resourceManagerConnection.start();
@@ -1295,7 +1311,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                        establishedResourceManagerConnection = new 
EstablishedResourceManagerConnection(
                                resourceManagerGateway,
-                               success.getResourceManagerId(),
                                resourceManagerResourceId);
 
                        
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
@@ -1541,7 +1556,12 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                @Override
                protected void onRegistrationSuccess(final 
JobMasterRegistrationSuccess success) {
-                       runAsync(() -> 
establishResourceManagerConnection(success));
+                       runAsync(() -> {
+                               // filter out replace connections
+                               if (this == resourceManagerConnection) {
+                                       
establishResourceManagerConnection(success);
+                               }
+                       });
                }
 
                @Override
@@ -1610,14 +1630,9 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                                log.info("The heartbeat of ResourceManager with 
id {} timed out.", resourceId);
 
                                if (establishedResourceManagerConnection != 
null && 
establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId))
 {
-                                       final String resourceManagerAddress = 
establishedResourceManagerConnection.getResourceManagerGateway().getAddress();
-                                       final ResourceManagerId 
resourceManagerId = establishedResourceManagerConnection.getResourceManagerId();
-
-                                       closeResourceManagerConnection(
-                                               new TimeoutException(
-                                                       "The heartbeat of 
ResourceManager with id " + resourceId + " timed out."));
-
-                                       
createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
+                                       reconnectToResourceManager(
+                                               new JobMasterException(
+                                                       String.format("The 
heartbeat of ResourceManager with id %s timed out.", resourceId)));
                                }
                        });
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/06437970/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java
new file mode 100644
index 0000000..d549f7b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+
+import javax.annotation.Nonnull;
+
+import java.util.Objects;
+
+/**
+ * Current address and fencing token of the leading ResourceManager.
+ */
+public class ResourceManagerAddress {
+
+       @Nonnull
+       private final String address;
+
+       @Nonnull
+       private final ResourceManagerId resourceManagerId;
+
+       public ResourceManagerAddress(@Nonnull String address, @Nonnull 
ResourceManagerId resourceManagerId) {
+               this.address = address;
+               this.resourceManagerId = resourceManagerId;
+       }
+
+       @Nonnull
+       public String getAddress() {
+               return address;
+       }
+
+       @Nonnull
+       public ResourceManagerId getResourceManagerId() {
+               return resourceManagerId;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               if (this == obj) {
+                       return true;
+               }
+
+               if (obj == null || getClass() != obj.getClass()) {
+                       return false;
+               }
+
+               ResourceManagerAddress that = (ResourceManagerAddress) obj;
+               return Objects.equals(address, that.address) &&
+                       Objects.equals(resourceManagerId, 
that.resourceManagerId);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(address, resourceManagerId);
+       }
+
+       @Override
+       public String toString() {
+               return address + '(' + resourceManagerId + ')';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06437970/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 09640d5..99cdc16 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -70,6 +70,7 @@ import 
org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Matchers;
@@ -89,6 +90,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -525,6 +527,94 @@ public class JobMasterTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests that we continue reconnecting to the latest known RM after a 
disconnection
+        * message.
+        */
+       @Test
+       public void testReconnectionAfterDisconnect() throws Exception {
+               final JobMaster jobMaster = createJobMaster(
+                       JobMasterConfiguration.fromConfiguration(configuration),
+                       jobGraph,
+                       haServices,
+                       new TestingJobManagerSharedServicesBuilder().build());
+
+               final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+               jobMaster.start(jobMasterId, testingTimeout);
+
+               try {
+                       final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+                       final BlockingQueue<JobMasterId> registrationsQueue = 
new ArrayBlockingQueue<>(1);
+
+                       
testingResourceManagerGateway.setRegisterJobManagerConsumer(
+                               jobMasterIdResourceIDStringJobIDTuple4 -> 
registrationsQueue.offer(jobMasterIdResourceIDStringJobIDTuple4.f0));
+
+                       
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+
+                       final ResourceManagerId resourceManagerId = 
testingResourceManagerGateway.getFencingToken();
+                       rmLeaderRetrievalService.notifyListener(
+                               testingResourceManagerGateway.getAddress(),
+                               resourceManagerId.toUUID());
+
+                       // wait for first registration attempt
+                       final JobMasterId firstRegistrationAttempt = 
registrationsQueue.take();
+
+                       assertThat(firstRegistrationAttempt, 
equalTo(jobMasterId));
+
+                       assertThat(registrationsQueue.isEmpty(), is(true));
+                       
jobMasterGateway.disconnectResourceManager(resourceManagerId, new 
FlinkException("Test exception"));
+
+                       // wait for the second registration attempt after the 
disconnect call
+                       assertThat(registrationsQueue.take(), 
equalTo(jobMasterId));
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
+       /**
+        * Tests that the a JM connects to the leading RM after regaining 
leadership.
+        */
+       @Test
+       public void testResourceManagerConnectionAfterRegainingLeadership() 
throws Exception {
+               final JobMaster jobMaster = createJobMaster(
+                       JobMasterConfiguration.fromConfiguration(configuration),
+                       jobGraph,
+                       haServices,
+                       new TestingJobManagerSharedServicesBuilder().build());
+
+               jobMaster.start(jobMasterId, testingTimeout);
+
+               try {
+                       final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+
+                       final BlockingQueue<JobMasterId> registrationQueue = 
new ArrayBlockingQueue<>(1);
+                       
testingResourceManagerGateway.setRegisterJobManagerConsumer(
+                               jobMasterIdResourceIDStringJobIDTuple4 -> 
registrationQueue.offer(jobMasterIdResourceIDStringJobIDTuple4.f0));
+
+                       final String resourceManagerAddress = 
testingResourceManagerGateway.getAddress();
+                       rpcService.registerGateway(resourceManagerAddress, 
testingResourceManagerGateway);
+
+                       
rmLeaderRetrievalService.notifyListener(resourceManagerAddress, 
testingResourceManagerGateway.getFencingToken().toUUID());
+
+                       final JobMasterId firstRegistrationAttempt = 
registrationQueue.take();
+
+                       assertThat(firstRegistrationAttempt, 
equalTo(jobMasterId));
+
+                       jobMaster.suspend(new FlinkException("Test 
exception."), testingTimeout).get();
+
+                       final JobMasterId jobMasterId2 = JobMasterId.generate();
+
+                       jobMaster.start(jobMasterId2, testingTimeout).get();
+
+                       final JobMasterId secondRegistrationAttempt = 
registrationQueue.take();
+
+                       assertThat(secondRegistrationAttempt, 
equalTo(jobMasterId2));
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
        private File createSavepoint(long savepointId) throws IOException {
                final File savepointFile = temporaryFolder.newFile();
                final SavepointV2 savepoint = new SavepointV2(savepointId, 
Collections.emptyList(), Collections.emptyList());

Reply via email to