[FLINK-9408] Let JM try to reconnect to RM This commit changes the behaviour of the JM to always try to reconnect to the latest known RM address.
This closes #6056. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06437970 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06437970 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06437970 Branch: refs/heads/master Commit: 064379705f36aaea927c44bb303a867c0c66265d Parents: 9ad868c Author: Till Rohrmann <[email protected]> Authored: Tue May 22 15:53:38 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Wed May 23 00:58:57 2018 +0200 ---------------------------------------------------------------------- .../EstablishedResourceManagerConnection.java | 13 +-- .../flink/runtime/jobmaster/JobMaster.java | 91 ++++++++++++-------- .../jobmaster/ResourceManagerAddress.java | 77 +++++++++++++++++ .../flink/runtime/jobmaster/JobMasterTest.java | 90 +++++++++++++++++++ 4 files changed, 224 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/06437970/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java index 46c1b4b..e64754d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/EstablishedResourceManagerConnection.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; -import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import javax.annotation.Nonnull; @@ -30,29 +29,25 @@ import javax.annotation.Nonnull; */ class EstablishedResourceManagerConnection { + @Nonnull private final ResourceManagerGateway resourceManagerGateway; - private final ResourceManagerId resourceManagerId; - + @Nonnull private final ResourceID resourceManagerResourceID; EstablishedResourceManagerConnection( @Nonnull ResourceManagerGateway resourceManagerGateway, - @Nonnull ResourceManagerId resourceManagerId, @Nonnull ResourceID resourceManagerResourceID) { this.resourceManagerGateway = resourceManagerGateway; - this.resourceManagerId = resourceManagerId; this.resourceManagerResourceID = resourceManagerResourceID; } + @Nonnull public ResourceManagerGateway getResourceManagerGateway() { return resourceManagerGateway; } - public ResourceManagerId getResourceManagerId() { - return resourceManagerId; - } - + @Nonnull public ResourceID getResourceManagerResourceID() { return resourceManagerResourceID; } http://git-wip-us.apache.org/repos/asf/flink/blob/06437970/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 5f5a9a9..1df9d89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -131,6 +131,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * JobMaster implementation. The job master is responsible for the execution of a single @@ -209,6 +210,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast private String lastInternalSavepoint; @Nullable + private ResourceManagerAddress resourceManagerAddress; + + @Nullable private ResourceManagerConnection resourceManagerConnection; @Nullable @@ -888,13 +892,13 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast final Exception cause) { if (isConnectingToResourceManager(resourceManagerId)) { - closeResourceManagerConnection(cause); + reconnectToResourceManager(cause); } } private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) { - return resourceManagerConnection != null - && resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId); + return resourceManagerAddress != null + && resourceManagerAddress.getResourceManagerId().equals(resourceManagerId); } @Override @@ -999,9 +1003,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast setNewFencingToken(newJobMasterId); + startJobMasterServices(); + log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID()); - startJobMasterServices(); resetAndScheduleExecutionGraph(); return Acknowledge.get(); @@ -1011,6 +1016,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast // start the slot pool make sure the slot pool now accepts messages for this leader slotPool.start(getFencingToken(), getAddress()); + //TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start + // try to reconnect to previously known leader + reconnectToResourceManager(new FlinkException("Starting JobMaster component.")); + // job is ready to go, try to establish connection with resource manager // - activate leader retrieval for the resource manager // - on notification of the leader, the connection will be established and @@ -1072,8 +1081,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast ExecutionGraph newExecutionGraph, JobManagerJobMetricGroup newJobManagerJobMetricGroup) { validateRunsInMainThread(); - Preconditions.checkState(executionGraph.getState().isTerminalState()); - Preconditions.checkState(jobManagerJobMetricGroup == null); + checkState(executionGraph.getState().isTerminalState()); + checkState(jobManagerJobMetricGroup == null); executionGraph = newExecutionGraph; jobManagerJobMetricGroup = newJobManagerJobMetricGroup; @@ -1103,7 +1112,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } private void scheduleExecutionGraph() { - Preconditions.checkState(jobStatusListener == null); + checkState(jobStatusListener == null); // register self as job status change listener jobStatusListener = new JobManagerJobStatusListener(); executionGraph.registerJobStatusListener(jobStatusListener); @@ -1239,33 +1248,40 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } } - private void notifyOfNewResourceManagerLeader(final String resourceManagerAddress, final ResourceManagerId resourceManagerId) { - if (resourceManagerConnection != null) { - if (resourceManagerAddress != null) { - if (Objects.equals(resourceManagerAddress, resourceManagerConnection.getTargetAddress()) - && Objects.equals(resourceManagerId, resourceManagerConnection.getTargetLeaderId())) { - // both address and leader id are not changed, we can keep the old connection - return; - } + private void notifyOfNewResourceManagerLeader(final String newResourceManagerAddress, final ResourceManagerId resourceManagerId) { + resourceManagerAddress = createResourceManagerAddress(newResourceManagerAddress, resourceManagerId); - log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getTargetAddress(), resourceManagerAddress); + reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress))); + } - closeResourceManagerConnection(new Exception( - "ResourceManager leader changed to new address " + resourceManagerAddress)); - } else { - log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", - resourceManagerConnection.getTargetAddress()); - } + @Nullable + private ResourceManagerAddress createResourceManagerAddress(@Nullable String newResourceManagerAddress, @Nullable ResourceManagerId resourceManagerId) { + if (newResourceManagerAddress != null) { + // the contract is: address == null <=> id == null + checkNotNull(resourceManagerId); + return new ResourceManagerAddress(newResourceManagerAddress, resourceManagerId); + } else { + return null; } + } + + private void reconnectToResourceManager(Exception cause) { + closeResourceManagerConnection(cause); + tryConnectToResourceManager(); + } + private void tryConnectToResourceManager() { if (resourceManagerAddress != null) { - createResourceManagerConnection(resourceManagerAddress, resourceManagerId); + connectToResourceManager(); } } - private void createResourceManagerConnection(String resourceManagerAddress, ResourceManagerId resourceManagerId) { - log.info("Attempting to register at ResourceManager {}", resourceManagerAddress); + private void connectToResourceManager() { + assert(resourceManagerAddress != null); + assert(resourceManagerConnection == null); + assert(establishedResourceManagerConnection == null); + + log.info("Connecting to ResourceManager {}", resourceManagerAddress); resourceManagerConnection = new ResourceManagerConnection( log, @@ -1273,8 +1289,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast resourceId, getAddress(), getFencingToken(), - resourceManagerAddress, - resourceManagerId, + resourceManagerAddress.getAddress(), + resourceManagerAddress.getResourceManagerId(), scheduledExecutorService); resourceManagerConnection.start(); @@ -1295,7 +1311,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast establishedResourceManagerConnection = new EstablishedResourceManagerConnection( resourceManagerGateway, - success.getResourceManagerId(), resourceManagerResourceId); slotPoolGateway.connectToResourceManager(resourceManagerGateway); @@ -1541,7 +1556,12 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast @Override protected void onRegistrationSuccess(final JobMasterRegistrationSuccess success) { - runAsync(() -> establishResourceManagerConnection(success)); + runAsync(() -> { + // filter out replace connections + if (this == resourceManagerConnection) { + establishResourceManagerConnection(success); + } + }); } @Override @@ -1610,14 +1630,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId); if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) { - final String resourceManagerAddress = establishedResourceManagerConnection.getResourceManagerGateway().getAddress(); - final ResourceManagerId resourceManagerId = establishedResourceManagerConnection.getResourceManagerId(); - - closeResourceManagerConnection( - new TimeoutException( - "The heartbeat of ResourceManager with id " + resourceId + " timed out.")); - - createResourceManagerConnection(resourceManagerAddress, resourceManagerId); + reconnectToResourceManager( + new JobMasterException( + String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId))); } }); } http://git-wip-us.apache.org/repos/asf/flink/blob/06437970/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java new file mode 100644 index 0000000..d549f7b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/ResourceManagerAddress.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; + +import javax.annotation.Nonnull; + +import java.util.Objects; + +/** + * Current address and fencing token of the leading ResourceManager. + */ +public class ResourceManagerAddress { + + @Nonnull + private final String address; + + @Nonnull + private final ResourceManagerId resourceManagerId; + + public ResourceManagerAddress(@Nonnull String address, @Nonnull ResourceManagerId resourceManagerId) { + this.address = address; + this.resourceManagerId = resourceManagerId; + } + + @Nonnull + public String getAddress() { + return address; + } + + @Nonnull + public ResourceManagerId getResourceManagerId() { + return resourceManagerId; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + ResourceManagerAddress that = (ResourceManagerAddress) obj; + return Objects.equals(address, that.address) && + Objects.equals(resourceManagerId, that.resourceManagerId); + } + + @Override + public int hashCode() { + return Objects.hash(address, resourceManagerId); + } + + @Override + public String toString() { + return address + '(' + resourceManagerId + ')'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/06437970/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 09640d5..99cdc16 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -70,6 +70,7 @@ import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.hamcrest.Matchers; @@ -89,6 +90,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -525,6 +527,94 @@ public class JobMasterTest extends TestLogger { } } + /** + * Tests that we continue reconnecting to the latest known RM after a disconnection + * message. + */ + @Test + public void testReconnectionAfterDisconnect() throws Exception { + final JobMaster jobMaster = createJobMaster( + JobMasterConfiguration.fromConfiguration(configuration), + jobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder().build()); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + jobMaster.start(jobMasterId, testingTimeout); + + try { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + final BlockingQueue<JobMasterId> registrationsQueue = new ArrayBlockingQueue<>(1); + + testingResourceManagerGateway.setRegisterJobManagerConsumer( + jobMasterIdResourceIDStringJobIDTuple4 -> registrationsQueue.offer(jobMasterIdResourceIDStringJobIDTuple4.f0)); + + rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + + final ResourceManagerId resourceManagerId = testingResourceManagerGateway.getFencingToken(); + rmLeaderRetrievalService.notifyListener( + testingResourceManagerGateway.getAddress(), + resourceManagerId.toUUID()); + + // wait for first registration attempt + final JobMasterId firstRegistrationAttempt = registrationsQueue.take(); + + assertThat(firstRegistrationAttempt, equalTo(jobMasterId)); + + assertThat(registrationsQueue.isEmpty(), is(true)); + jobMasterGateway.disconnectResourceManager(resourceManagerId, new FlinkException("Test exception")); + + // wait for the second registration attempt after the disconnect call + assertThat(registrationsQueue.take(), equalTo(jobMasterId)); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + + /** + * Tests that the a JM connects to the leading RM after regaining leadership. + */ + @Test + public void testResourceManagerConnectionAfterRegainingLeadership() throws Exception { + final JobMaster jobMaster = createJobMaster( + JobMasterConfiguration.fromConfiguration(configuration), + jobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder().build()); + + jobMaster.start(jobMasterId, testingTimeout); + + try { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + + final BlockingQueue<JobMasterId> registrationQueue = new ArrayBlockingQueue<>(1); + testingResourceManagerGateway.setRegisterJobManagerConsumer( + jobMasterIdResourceIDStringJobIDTuple4 -> registrationQueue.offer(jobMasterIdResourceIDStringJobIDTuple4.f0)); + + final String resourceManagerAddress = testingResourceManagerGateway.getAddress(); + rpcService.registerGateway(resourceManagerAddress, testingResourceManagerGateway); + + rmLeaderRetrievalService.notifyListener(resourceManagerAddress, testingResourceManagerGateway.getFencingToken().toUUID()); + + final JobMasterId firstRegistrationAttempt = registrationQueue.take(); + + assertThat(firstRegistrationAttempt, equalTo(jobMasterId)); + + jobMaster.suspend(new FlinkException("Test exception."), testingTimeout).get(); + + final JobMasterId jobMasterId2 = JobMasterId.generate(); + + jobMaster.start(jobMasterId2, testingTimeout).get(); + + final JobMasterId secondRegistrationAttempt = registrationQueue.take(); + + assertThat(secondRegistrationAttempt, equalTo(jobMasterId2)); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + private File createSavepoint(long savepointId) throws IOException { final File savepointFile = temporaryFolder.newFile(); final SavepointV2 savepoint = new SavepointV2(savepointId, Collections.emptyList(), Collections.emptyList());
