[FLINK-9358] Avoid NPE when closing an unestablished ResourceManager connection
A NPE occurred when trying to disconnect an unestablished ResourceManager connection. In order to fix this problem, we now check whether the connection has been established or not. This closes #6011. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a95ec5ac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a95ec5ac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a95ec5ac Branch: refs/heads/master Commit: a95ec5acf259884347ae539913bcffcad5bfc340 Parents: f4e0368 Author: Till Rohrmann <[email protected]> Authored: Mon May 14 14:14:45 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Tue May 15 00:23:10 2018 +0200 ---------------------------------------------------------------------- .../EstablishedResourceManagerConnection.java | 59 ++++++++++++++ .../flink/runtime/jobmaster/JobMaster.java | 83 +++++++++++--------- .../flink/runtime/jobmaster/JobMasterTest.java | 48 +++++++++++ 3 files changed, 155 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java new file mode 100644 index 0000000..46c1b4b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; + +import javax.annotation.Nonnull; + +/** + * Class which contains the connection details of an established + * connection with the ResourceManager. + */ +class EstablishedResourceManagerConnection { + + private final ResourceManagerGateway resourceManagerGateway; + + private final ResourceManagerId resourceManagerId; + + private final ResourceID resourceManagerResourceID; + + EstablishedResourceManagerConnection( + @Nonnull ResourceManagerGateway resourceManagerGateway, + @Nonnull ResourceManagerId resourceManagerId, + @Nonnull ResourceID resourceManagerResourceID) { + this.resourceManagerGateway = resourceManagerGateway; + this.resourceManagerId = resourceManagerId; + this.resourceManagerResourceID = resourceManagerResourceID; + } + + public ResourceManagerGateway getResourceManagerGateway() { + return resourceManagerGateway; + } + + public ResourceManagerId getResourceManagerId() { + return resourceManagerId; + } + + public ResourceID getResourceManagerResourceID() { + return resourceManagerResourceID; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index f30c119..aff3280 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -191,9 +191,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast private LeaderRetrievalService resourceManagerLeaderRetriever; - @Nullable - private ResourceManagerConnection resourceManagerConnection; - // --------- TaskManagers -------- private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers; @@ -211,6 +208,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast @Nullable private String lastInternalSavepoint; + @Nullable + private ResourceManagerConnection resourceManagerConnection; + + @Nullable + private EstablishedResourceManagerConnection establishedResourceManagerConnection; + // ------------------------------------------------------------------------ public JobMaster( @@ -290,6 +293,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph); this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup); this.jobStatusListener = null; + + this.resourceManagerConnection = null; + this.establishedResourceManagerConnection = null; } //---------------------------------------------------------------------------------------------- @@ -881,12 +887,16 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast final ResourceManagerId resourceManagerId, final Exception cause) { - if (resourceManagerConnection != null - && resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId)) { + if (isConnectingToResourceManager(resourceManagerId)) { closeResourceManagerConnection(cause); } } + private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) { + return resourceManagerConnection != null + && resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId); + } + @Override public void heartbeatFromTaskManager(final ResourceID resourceID, AccumulatorReport accumulatorReport) { taskManagerHeartbeatManager.receiveHeartbeat(resourceID, accumulatorReport); @@ -1238,11 +1248,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast return; } - closeResourceManagerConnection(new Exception( - "ResourceManager leader changed to new address " + resourceManagerAddress)); - log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", resourceManagerConnection.getTargetAddress(), resourceManagerAddress); + + closeResourceManagerConnection(new Exception( + "ResourceManager leader changed to new address " + resourceManagerAddress)); } else { log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", resourceManagerConnection.getTargetAddress()); @@ -1277,9 +1287,16 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); + final ResourceID resourceManagerResourceId = success.getResourceManagerResourceId(); + + establishedResourceManagerConnection = new EstablishedResourceManagerConnection( + resourceManagerGateway, + success.getResourceManagerId(), + resourceManagerResourceId); + slotPoolGateway.connectToResourceManager(resourceManagerGateway); - resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(), new HeartbeatTarget<Void>() { + resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<Void>() { @Override public void receiveHeartbeat(ResourceID resourceID, Void payload) { resourceManagerGateway.heartbeatFromJobManager(resourceID); @@ -1297,22 +1314,31 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } private void closeResourceManagerConnection(Exception cause) { - if (resourceManagerConnection != null) { - if (log.isDebugEnabled()) { - log.debug("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerResourceID(), cause); - } else { - log.info("Close ResourceManager connection {}: {}.", resourceManagerConnection.getResourceManagerResourceID(), cause.getMessage()); - } - - resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerResourceID()); - - ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); - resourceManagerGateway.disconnectJobManager(resourceManagerConnection.getJobID(), cause); + if (establishedResourceManagerConnection != null) { + dissolveResourceManagerConnection(establishedResourceManagerConnection, cause); + establishedResourceManagerConnection = null; + } + if (resourceManagerConnection != null) { + // stop a potentially ongoing registration process resourceManagerConnection.close(); resourceManagerConnection = null; } + } + + private void dissolveResourceManagerConnection(EstablishedResourceManagerConnection establishedResourceManagerConnection, Exception cause) { + final ResourceID resourceManagerResourceID = establishedResourceManagerConnection.getResourceManagerResourceID(); + if (log.isDebugEnabled()) { + log.debug("Close ResourceManager connection {}.", resourceManagerResourceID, cause); + } else { + log.info("Close ResourceManager connection {}: {}.", resourceManagerResourceID, cause.getMessage()); + } + + resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerResourceID); + + ResourceManagerGateway resourceManagerGateway = establishedResourceManagerConnection.getResourceManagerGateway(); + resourceManagerGateway.disconnectJobManager(jobGraph.getJobID(), cause); slotPoolGateway.disconnectResourceManager(); } @@ -1473,8 +1499,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast private final JobMasterId jobMasterId; - private ResourceID resourceManagerResourceID; - ResourceManagerConnection( final Logger log, final JobID jobID, @@ -1498,7 +1522,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast getTargetAddress(), getTargetLeaderId()) { @Override protected CompletableFuture<RegistrationResponse> invokeRegistration( - ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) throws Exception { + ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) { Time timeout = Time.milliseconds(timeoutMillis); return gateway.registerJobManager( @@ -1513,24 +1537,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast @Override protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) { - runAsync(() -> { - resourceManagerResourceID = success.getResourceManagerResourceId(); - establishResourceManagerConnection(success); - }); + runAsync(() -> establishResourceManagerConnection(success)); } @Override protected void onRegistrationFailure(final Throwable failure) { handleJobMasterError(failure); } - - public ResourceID getResourceManagerResourceID() { - return resourceManagerResourceID; - } - - public JobID getJobID() { - return jobID; - } } //---------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a95ec5ac/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 2f61681..c0c9162 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.CheckpointProperties; @@ -361,6 +362,53 @@ public class JobMasterTest extends TestLogger { } } + /** + * Tests that we can close an unestablished ResourceManager connection. + */ + @Test + public void testCloseUnestablishedResourceManagerConnection() throws Exception { + final JobMaster jobMaster = createJobMaster( + JobMasterConfiguration.fromConfiguration(configuration), + jobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder().build()); + + try { + jobMaster.start(JobMasterId.generate(), testingTimeout).get(); + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); + final String firstResourceManagerAddress = "address1"; + final String secondResourceManagerAddress = "address2"; + + final TestingResourceManagerGateway firstResourceManagerGateway = new TestingResourceManagerGateway(); + final TestingResourceManagerGateway secondResourceManagerGateway = new TestingResourceManagerGateway(); + + rpcService.registerGateway(firstResourceManagerAddress, firstResourceManagerGateway); + rpcService.registerGateway(secondResourceManagerAddress, secondResourceManagerGateway); + + final OneShotLatch firstJobManagerRegistration = new OneShotLatch(); + final OneShotLatch secondJobManagerRegistration = new OneShotLatch(); + + firstResourceManagerGateway.setRegisterJobManagerConsumer( + jobMasterIdResourceIDStringJobIDTuple4 -> firstJobManagerRegistration.trigger()); + + secondResourceManagerGateway.setRegisterJobManagerConsumer( + jobMasterIdResourceIDStringJobIDTuple4 -> secondJobManagerRegistration.trigger()); + + rmLeaderRetrievalService.notifyListener(firstResourceManagerAddress, resourceManagerId.toUUID()); + + // wait until we have seen the first registration attempt + firstJobManagerRegistration.await(); + + // this should stop the connection attempts towards the first RM + rmLeaderRetrievalService.notifyListener(secondResourceManagerAddress, resourceManagerId.toUUID()); + + // check that we start registering at the second RM + secondJobManagerRegistration.await(); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + private File createSavepoint(long savepointId) throws IOException { final File savepointFile = temporaryFolder.newFile(); final SavepointV2 savepoint = new SavepointV2(savepointId, Collections.emptyList(), Collections.emptyList());
