[FLINK-9358] Avoid NPE when closing an unestablished ResourceManager connection

A NPE occurred when trying to disconnect an unestablished ResourceManager 
connection.
In order to fix this problem, we now check whether the connection has been 
established
or not.

This closes #6011.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a95ec5ac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a95ec5ac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a95ec5ac

Branch: refs/heads/master
Commit: a95ec5acf259884347ae539913bcffcad5bfc340
Parents: f4e0368
Author: Till Rohrmann <[email protected]>
Authored: Mon May 14 14:14:45 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue May 15 00:23:10 2018 +0200

----------------------------------------------------------------------
 .../EstablishedResourceManagerConnection.java   | 59 ++++++++++++++
 .../flink/runtime/jobmaster/JobMaster.java      | 83 +++++++++++---------
 .../flink/runtime/jobmaster/JobMasterTest.java  | 48 +++++++++++
 3 files changed, 155 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
new file mode 100644
index 0000000..46c1b4b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Class which contains the connection details of an established
+ * connection with the ResourceManager.
+ */
+class EstablishedResourceManagerConnection {
+
+       private final ResourceManagerGateway resourceManagerGateway;
+
+       private final ResourceManagerId resourceManagerId;
+
+       private final ResourceID resourceManagerResourceID;
+
+       EstablishedResourceManagerConnection(
+                       @Nonnull ResourceManagerGateway resourceManagerGateway,
+                       @Nonnull ResourceManagerId resourceManagerId,
+                       @Nonnull ResourceID resourceManagerResourceID) {
+               this.resourceManagerGateway = resourceManagerGateway;
+               this.resourceManagerId = resourceManagerId;
+               this.resourceManagerResourceID = resourceManagerResourceID;
+       }
+
+       public ResourceManagerGateway getResourceManagerGateway() {
+               return resourceManagerGateway;
+       }
+
+       public ResourceManagerId getResourceManagerId() {
+               return resourceManagerId;
+       }
+
+       public ResourceID getResourceManagerResourceID() {
+               return resourceManagerResourceID;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f30c119..aff3280 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -191,9 +191,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
        private LeaderRetrievalService resourceManagerLeaderRetriever;
 
-       @Nullable
-       private ResourceManagerConnection resourceManagerConnection;
-
        // --------- TaskManagers --------
 
        private final Map<ResourceID, Tuple2<TaskManagerLocation, 
TaskExecutorGateway>> registeredTaskManagers;
@@ -211,6 +208,12 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        @Nullable
        private String lastInternalSavepoint;
 
+       @Nullable
+       private ResourceManagerConnection resourceManagerConnection;
+
+       @Nullable
+       private EstablishedResourceManagerConnection 
establishedResourceManagerConnection;
+
        // 
------------------------------------------------------------------------
 
        public JobMaster(
@@ -290,6 +293,9 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                this.jobManagerJobMetricGroup = 
jobMetricGroupFactory.create(jobGraph);
                this.executionGraph = 
createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
                this.jobStatusListener = null;
+
+               this.resourceManagerConnection = null;
+               this.establishedResourceManagerConnection = null;
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -881,12 +887,16 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        final ResourceManagerId resourceManagerId,
                        final Exception cause) {
 
-               if (resourceManagerConnection != null
-                               && 
resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId)) {
+               if (isConnectingToResourceManager(resourceManagerId)) {
                        closeResourceManagerConnection(cause);
                }
        }
 
+       private boolean isConnectingToResourceManager(ResourceManagerId 
resourceManagerId) {
+               return resourceManagerConnection != null
+                               && 
resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId);
+       }
+
        @Override
        public void heartbeatFromTaskManager(final ResourceID resourceID, 
AccumulatorReport accumulatorReport) {
                taskManagerHeartbeatManager.receiveHeartbeat(resourceID, 
accumulatorReport);
@@ -1238,11 +1248,11 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                                        return;
                                }
 
-                               closeResourceManagerConnection(new Exception(
-                                       "ResourceManager leader changed to new 
address " + resourceManagerAddress));
-
                                log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
                                        
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+
+                               closeResourceManagerConnection(new Exception(
+                                       "ResourceManager leader changed to new 
address " + resourceManagerAddress));
                        } else {
                                log.info("Current ResourceManager {} lost 
leader status. Waiting for new ResourceManager leader.",
                                        
resourceManagerConnection.getTargetAddress());
@@ -1277,9 +1287,16 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                        final ResourceManagerGateway resourceManagerGateway = 
resourceManagerConnection.getTargetGateway();
 
+                       final ResourceID resourceManagerResourceId = 
success.getResourceManagerResourceId();
+
+                       establishedResourceManagerConnection = new 
EstablishedResourceManagerConnection(
+                               resourceManagerGateway,
+                               success.getResourceManagerId(),
+                               resourceManagerResourceId);
+
                        
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
 
-                       
resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(),
 new HeartbeatTarget<Void>() {
+                       
resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new 
HeartbeatTarget<Void>() {
                                @Override
                                public void receiveHeartbeat(ResourceID 
resourceID, Void payload) {
                                        
resourceManagerGateway.heartbeatFromJobManager(resourceID);
@@ -1297,22 +1314,31 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        }
 
        private void closeResourceManagerConnection(Exception cause) {
-               if (resourceManagerConnection != null) {
-                       if (log.isDebugEnabled()) {
-                               log.debug("Close ResourceManager connection 
{}.", resourceManagerConnection.getResourceManagerResourceID(), cause);
-                       } else {
-                               log.info("Close ResourceManager connection {}: 
{}.", resourceManagerConnection.getResourceManagerResourceID(), 
cause.getMessage());
-                       }
-
-                       
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerResourceID());
-
-                       ResourceManagerGateway resourceManagerGateway = 
resourceManagerConnection.getTargetGateway();
-                       
resourceManagerGateway.disconnectJobManager(resourceManagerConnection.getJobID(),
 cause);
+               if (establishedResourceManagerConnection != null) {
+                       
dissolveResourceManagerConnection(establishedResourceManagerConnection, cause);
+                       establishedResourceManagerConnection = null;
+               }
 
+               if (resourceManagerConnection != null) {
+                       // stop a potentially ongoing registration process
                        resourceManagerConnection.close();
                        resourceManagerConnection = null;
                }
+       }
+
+       private void 
dissolveResourceManagerConnection(EstablishedResourceManagerConnection 
establishedResourceManagerConnection, Exception cause) {
+               final ResourceID resourceManagerResourceID = 
establishedResourceManagerConnection.getResourceManagerResourceID();
 
+               if (log.isDebugEnabled()) {
+                       log.debug("Close ResourceManager connection {}.", 
resourceManagerResourceID, cause);
+               } else {
+                       log.info("Close ResourceManager connection {}: {}.", 
resourceManagerResourceID, cause.getMessage());
+               }
+
+               
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceID);
+
+               ResourceManagerGateway resourceManagerGateway = 
establishedResourceManagerConnection.getResourceManagerGateway();
+               
resourceManagerGateway.disconnectJobManager(jobGraph.getJobID(), cause);
                slotPoolGateway.disconnectResourceManager();
        }
 
@@ -1473,8 +1499,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                private final JobMasterId jobMasterId;
 
-               private ResourceID resourceManagerResourceID;
-
                ResourceManagerConnection(
                                final Logger log,
                                final JobID jobID,
@@ -1498,7 +1522,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                                        getTargetAddress(), 
getTargetLeaderId()) {
                                @Override
                                protected 
CompletableFuture<RegistrationResponse> invokeRegistration(
-                                               ResourceManagerGateway gateway, 
ResourceManagerId fencingToken, long timeoutMillis) throws Exception {
+                                               ResourceManagerGateway gateway, 
ResourceManagerId fencingToken, long timeoutMillis) {
                                        Time timeout = 
Time.milliseconds(timeoutMillis);
 
                                        return gateway.registerJobManager(
@@ -1513,24 +1537,13 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                @Override
                protected void onRegistrationSuccess(final 
JobMasterRegistrationSuccess success) {
-                       runAsync(() -> {
-                               resourceManagerResourceID = 
success.getResourceManagerResourceId();
-                               establishResourceManagerConnection(success);
-                       });
+                       runAsync(() -> 
establishResourceManagerConnection(success));
                }
 
                @Override
                protected void onRegistrationFailure(final Throwable failure) {
                        handleJobMasterError(failure);
                }
-
-               public ResourceID getResourceManagerResourceID() {
-                       return resourceManagerResourceID;
-               }
-
-               public JobID getJobID() {
-                       return jobID;
-               }
        }
 
        
//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 2f61681..c0c9162 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
@@ -361,6 +362,53 @@ public class JobMasterTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests that we can close an unestablished ResourceManager connection.
+        */
+       @Test
+       public void testCloseUnestablishedResourceManagerConnection() throws 
Exception {
+               final JobMaster jobMaster = createJobMaster(
+                       JobMasterConfiguration.fromConfiguration(configuration),
+                       jobGraph,
+                       haServices,
+                       new TestingJobManagerSharedServicesBuilder().build());
+
+               try {
+                       jobMaster.start(JobMasterId.generate(), 
testingTimeout).get();
+                       final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
+                       final String firstResourceManagerAddress = "address1";
+                       final String secondResourceManagerAddress = "address2";
+
+                       final TestingResourceManagerGateway 
firstResourceManagerGateway = new TestingResourceManagerGateway();
+                       final TestingResourceManagerGateway 
secondResourceManagerGateway = new TestingResourceManagerGateway();
+
+                       rpcService.registerGateway(firstResourceManagerAddress, 
firstResourceManagerGateway);
+                       
rpcService.registerGateway(secondResourceManagerAddress, 
secondResourceManagerGateway);
+
+                       final OneShotLatch firstJobManagerRegistration = new 
OneShotLatch();
+                       final OneShotLatch secondJobManagerRegistration = new 
OneShotLatch();
+
+                       
firstResourceManagerGateway.setRegisterJobManagerConsumer(
+                               jobMasterIdResourceIDStringJobIDTuple4 -> 
firstJobManagerRegistration.trigger());
+
+                       
secondResourceManagerGateway.setRegisterJobManagerConsumer(
+                               jobMasterIdResourceIDStringJobIDTuple4 -> 
secondJobManagerRegistration.trigger());
+
+                       
rmLeaderRetrievalService.notifyListener(firstResourceManagerAddress, 
resourceManagerId.toUUID());
+
+                       // wait until we have seen the first registration 
attempt
+                       firstJobManagerRegistration.await();
+
+                       // this should stop the connection attempts towards the 
first RM
+                       
rmLeaderRetrievalService.notifyListener(secondResourceManagerAddress, 
resourceManagerId.toUUID());
+
+                       // check that we start registering at the second RM
+                       secondJobManagerRegistration.await();
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
        private File createSavepoint(long savepointId) throws IOException {
                final File savepointFile = temporaryFolder.newFile();
                final SavepointV2 savepoint = new SavepointV2(savepointId, 
Collections.emptyList(), Collections.emptyList());

Reply via email to