[FLINK-6160] Add reconnection attempts in case of heartbeat timeouts to JobMaster and TaskExecutor
If a timeout with the RM occurs on on the JobMaster and TaskExecutor, then they will both try to reconnect to the last known RM address. Additionally, we now respect the TaskManagerOption#REGISTRATION_TIMEOUT on the TaskExecutor. This means that if the TaskExecutor could not register at a RM within the given registration timeout, it will fail with a fatal exception. This allows to fail the TaskExecutor process in case that it cannot establish a connection and ultimately frees the occupied resources. The commit also changes the default value for TaskManagerOption#REGISTRATION_TIMEOUT from "Inf" to "5 min". This closes #6035. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c832f52a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c832f52a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c832f52a Branch: refs/heads/master Commit: c832f52a9ca489b72e3eddcb51c288d23d66b571 Parents: 15cdc5c Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu May 17 14:44:14 2018 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu May 17 19:49:44 2018 +0200 ---------------------------------------------------------------------- .../generated/task_manager_configuration.html | 2 +- .../flink/configuration/TaskManagerOptions.java | 2 +- .../flink/runtime/jobmaster/JobMaster.java | 41 ++++++--- .../runtime/taskexecutor/TaskExecutor.java | 85 +++++++++++++---- .../taskexecutor/TaskManagerConfiguration.java | 7 +- .../RegistrationTimeoutException.java | 41 +++++++++ .../flink/runtime/jobmaster/JobMasterTest.java | 20 ++-- .../utils/TestingResourceManagerGateway.java | 4 + .../runtime/taskexecutor/TaskExecutorTest.java | 96 ++++++++++++++++++++ 9 files changed, 257 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/docs/_includes/generated/task_manager_configuration.html ---------------------------------------------------------------------- diff --git a/docs/_includes/generated/task_manager_configuration.html b/docs/_includes/generated/task_manager_configuration.html index fe999a8..fdb975f 100644 --- a/docs/_includes/generated/task_manager_configuration.html +++ b/docs/_includes/generated/task_manager_configuration.html @@ -154,7 +154,7 @@ </tr> <tr> <td><h5>taskmanager.registration.timeout</h5></td> - <td style="word-wrap: break-word;">"Inf"</td> + <td style="word-wrap: break-word;">"5 min"</td> <td>Defines the timeout for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates.</td> </tr> <tr> http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index b726e07..8017f7a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -139,7 +139,7 @@ public class TaskManagerOptions { */ public static final ConfigOption<String> REGISTRATION_TIMEOUT = key("taskmanager.registration.timeout") - .defaultValue("Inf") + .defaultValue("5 min") .withDeprecatedKeys("taskmanager.maxRegistrationDuration") .withDescription("Defines the timeout for the TaskManager registration. If the duration is" + " exceeded without a successful registration, then the TaskManager terminates."); http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index f1dbbb4..5f5a9a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -1260,20 +1260,24 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast } if (resourceManagerAddress != null) { - log.info("Attempting to register at ResourceManager {}", resourceManagerAddress); + createResourceManagerConnection(resourceManagerAddress, resourceManagerId); + } + } - resourceManagerConnection = new ResourceManagerConnection( - log, - jobGraph.getJobID(), - resourceId, - getAddress(), - getFencingToken(), - resourceManagerAddress, - resourceManagerId, - scheduledExecutorService); + private void createResourceManagerConnection(String resourceManagerAddress, ResourceManagerId resourceManagerId) { + log.info("Attempting to register at ResourceManager {}", resourceManagerAddress); - resourceManagerConnection.start(); - } + resourceManagerConnection = new ResourceManagerConnection( + log, + jobGraph.getJobID(), + resourceId, + getAddress(), + getFencingToken(), + resourceManagerAddress, + resourceManagerId, + scheduledExecutorService); + + resourceManagerConnection.start(); } private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) { @@ -1605,9 +1609,16 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast runAsync(() -> { log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId); - closeResourceManagerConnection( - new TimeoutException( - "The heartbeat of ResourceManager with id " + resourceId + " timed out.")); + if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId)) { + final String resourceManagerAddress = establishedResourceManagerConnection.getResourceManagerGateway().getAddress(); + final ResourceManagerId resourceManagerId = establishedResourceManagerConnection.getResourceManagerId(); + + closeResourceManagerConnection( + new TimeoutException( + "The heartbeat of ResourceManager with id " + resourceId + " timed out.")); + + createResourceManagerConnection(resourceManagerAddress, resourceManagerId); + } }); } http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 6fdb2bd..e87c44d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -78,6 +78,7 @@ import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.TaskStateManagerImpl; import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException; import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException; +import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException; import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; import org.apache.flink.runtime.taskexecutor.exceptions.TaskException; @@ -101,6 +102,9 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -187,6 +191,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private FileCache fileCache; + @Nullable + private UUID currentRegistrationTimeoutId; + public TaskExecutor( RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, @@ -232,8 +239,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { rpcService.getScheduledExecutor(), log); - hardwareDescription = HardwareDescription.extractFromSystem( + this.hardwareDescription = HardwareDescription.extractFromSystem( taskExecutorServices.getMemoryManager().getMemorySize()); + + this.currentRegistrationTimeoutId = null; } // ------------------------------------------------------------------------ @@ -258,6 +267,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService()); + + startRegistrationTimeout(); } /** @@ -269,7 +280,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { Throwable throwable = null; - if (isConnectedToResourceManager()) { + if (resourceManagerConnection != null) { resourceManagerConnection.close(); } @@ -876,24 +887,28 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { // establish a connection to the new leader if (newLeaderAddress != null) { - log.info("Attempting to register at ResourceManager {} ({})", newLeaderAddress, newResourceManagerId); - resourceManagerConnection = - new TaskExecutorToResourceManagerConnection( - log, - getRpcService(), - getAddress(), - getResourceID(), - taskSlotTable.createSlotReport(getResourceID()), - taskManagerLocation.dataPort(), - hardwareDescription, - newLeaderAddress, - newResourceManagerId, - getMainThreadExecutor(), - new ResourceManagerRegistrationListener()); - resourceManagerConnection.start(); + createResourceManagerConnection(newLeaderAddress, newResourceManagerId); } } + private void createResourceManagerConnection(String newLeaderAddress, ResourceManagerId newResourceManagerId) { + log.info("Attempting to register at ResourceManager {} ({})", newLeaderAddress, newResourceManagerId); + resourceManagerConnection = + new TaskExecutorToResourceManagerConnection( + log, + getRpcService(), + getAddress(), + getResourceID(), + taskSlotTable.createSlotReport(getResourceID()), + taskManagerLocation.dataPort(), + hardwareDescription, + newLeaderAddress, + newResourceManagerId, + getMainThreadExecutor(), + new ResourceManagerRegistrationListener()); + resourceManagerConnection.start(); + } + private void establishResourceManagerConnection( ResourceID resourceManagerResourceId, ClusterInformation clusterInformation) { @@ -917,6 +932,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { clusterInformation.getBlobServerPort()); blobCacheService.setBlobServerAddress(blobServerAddress); + + stopRegistrationTimeout(); } private void closeResourceManagerConnection(Exception cause) { @@ -947,6 +964,34 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { resourceManagerConnection.close(); resourceManagerConnection = null; } + + startRegistrationTimeout(); + } + + private void startRegistrationTimeout() { + final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration(); + + if (maxRegistrationDuration != null) { + final UUID newRegistrationTimeoutId = UUID.randomUUID(); + currentRegistrationTimeoutId = newRegistrationTimeoutId; + scheduleRunAsync(() -> registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration); + } + } + + private void stopRegistrationTimeout() { + currentRegistrationTimeoutId = null; + } + + private void registrationTimeout(@Nonnull UUID registrationTimeoutId) { + if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) { + final Time maxRegistrationDuration = taskManagerConfiguration.getMaxRegistrationDuration(); + + onFatalError( + new RegistrationTimeoutException( + String.format("Could not register at the ResourceManager within the specified maximum " + + "registration duration %s. This indicates a problem with this instance. Terminating now.", + maxRegistrationDuration))); + } } // ------------------------------------------------------------------------ @@ -1556,9 +1601,15 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { if (resourceManagerConnection != null && resourceManagerConnection.getResourceManagerId().equals(resourceId)) { log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId); + final String resourceManagerAddress = resourceManagerConnection.getTargetAddress(); + final ResourceManagerId resourceManagerId = resourceManagerConnection.getTargetLeaderId(); + closeResourceManagerConnection( new TimeoutException( "The heartbeat of ResourceManager with id " + resourceId + " timed out.")); + + // try to reconnect to old ResourceManager + createResourceManagerConnection(resourceManagerAddress, resourceManagerId); } else { log.debug("Received heartbeat timeout for outdated ResourceManager id {}. Ignoring the timeout.", resourceId); } http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java index e8a7ae8..91beabf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -50,8 +50,11 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { private final String[] tmpDirectories; private final Time timeout; + // null indicates an infinite duration + @Nullable private final Time maxRegistrationDuration; + private final Time initialRegistrationPause; private final Time maxRegistrationPause; private final Time refusedRegistrationPause; @@ -74,7 +77,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { int numberSlots, String[] tmpDirectories, Time timeout, - Time maxRegistrationDuration, + @Nullable Time maxRegistrationDuration, Time initialRegistrationPause, Time maxRegistrationPause, Time refusedRegistrationPause, @@ -108,6 +111,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { return timeout; } + @Nullable public Time getMaxRegistrationDuration() { return maxRegistrationDuration; } @@ -116,6 +120,7 @@ public class TaskManagerConfiguration implements TaskManagerRuntimeInfo { return initialRegistrationPause; } + @Nullable public Time getMaxRegistrationPause() { return maxRegistrationPause; } http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java new file mode 100644 index 0000000..561089a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.exceptions; + +import org.apache.flink.runtime.taskexecutor.TaskExecutor; + +/** + * Exception which indicates that the {@link TaskExecutor} could not register at + * the master in time. + */ +public class RegistrationTimeoutException extends TaskManagerException { + private static final long serialVersionUID = -6377818046575001931L; + + public RegistrationTimeoutException(String message) { + super(message); + } + + public RegistrationTimeoutException(String message, Throwable cause) { + super(message, cause); + } + + public RegistrationTimeoutException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index f2de134..09640d5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -90,6 +90,7 @@ import java.util.Collection; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.hamcrest.MatcherAssert.assertThat; @@ -243,17 +244,21 @@ public class JobMasterTest extends TestLogger { resourceManagerId, rmResourceId, fastHeartbeatInterval, - "localhost", + resourceManagerAddress, "localhost"); final CompletableFuture<Tuple3<JobMasterId, ResourceID, JobID>> jobManagerRegistrationFuture = new CompletableFuture<>(); final CompletableFuture<JobID> disconnectedJobManagerFuture = new CompletableFuture<>(); + final CountDownLatch registrationAttempts = new CountDownLatch(2); - resourceManagerGateway.setRegisterJobManagerConsumer(tuple -> jobManagerRegistrationFuture.complete( - Tuple3.of( - tuple.f0, - tuple.f1, - tuple.f3))); + resourceManagerGateway.setRegisterJobManagerConsumer(tuple -> { + jobManagerRegistrationFuture.complete( + Tuple3.of( + tuple.f0, + tuple.f1, + tuple.f3)); + registrationAttempts.countDown(); + }); resourceManagerGateway.setDisconnectJobManagerConsumer(tuple -> disconnectedJobManagerFuture.complete(tuple.f0)); @@ -286,6 +291,9 @@ public class JobMasterTest extends TestLogger { // heartbeat timeout should trigger disconnect JobManager from ResourceManager assertThat(disconnectedJobManager, Matchers.equalTo(jobGraph.getJobID())); + + // the JobMaster should try to reconnect to the RM + registrationAttempts.await(); } finally { jobManagerSharedServices.shutdown(); RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java index 9b40414..106fd73 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java @@ -109,6 +109,10 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { this.requestSlotConsumer = null; } + public ResourceID getOwnResourceId() { + return ownResourceId; + } + public void setRequestSlotFuture(CompletableFuture<Acknowledge> slotFuture) { this.slotFutureReference.set(slotFuture); } http://git-wip-us.apache.org/repos/asf/flink/blob/c832f52a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 6b3c8f6..ca31f76 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -76,6 +78,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; +import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -87,6 +90,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -113,12 +117,15 @@ import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -302,9 +309,11 @@ public class TaskExecutorTest extends TestLogger { new ClusterInformation("localhost", 1234)); final CompletableFuture<ResourceID> taskExecutorRegistrationFuture = new CompletableFuture<>(); + final CountDownLatch registrationAttempts = new CountDownLatch(2); rmGateway.setRegisterTaskExecutorFunction( registration -> { taskExecutorRegistrationFuture.complete(registration.f1); + registrationAttempts.countDown(); return CompletableFuture.completedFuture(registrationResponse); }); @@ -353,6 +362,9 @@ public class TaskExecutorTest extends TestLogger { // heartbeat timeout should trigger disconnect TaskManager from ResourceManager assertThat(taskExecutorDisconnectFuture.get(heartbeatTimeout * 50L, TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID())); + // the TaskExecutor should try to reconnect to the RM + registrationAttempts.await(); + } finally { RpcUtils.terminateRpcEndpoint(taskManager, timeout); } @@ -1387,6 +1399,90 @@ public class TaskExecutorTest extends TestLogger { } } + @Test + public void testMaximumRegistrationDuration() throws Exception { + configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms"); + + final TaskExecutor taskExecutor = new TaskExecutor( + rpc, + TaskManagerConfiguration.fromConfiguration(configuration), + haServices, + new TaskManagerServicesBuilder().build(), + new HeartbeatServices(1000L, 1000L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), + dummyBlobCacheService, + testingFatalErrorHandler); + + taskExecutor.start(); + + try { + final Throwable error = testingFatalErrorHandler.getErrorFuture().get(); + assertThat(error, is(notNullValue())); + assertThat(ExceptionUtils.stripExecutionException(error), instanceOf(RegistrationTimeoutException.class)); + + testingFatalErrorHandler.clearError(); + } finally { + RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); + } + } + + @Test + public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exception { + configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "100 ms"); + final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService); + + final long heartbeatInterval = 10L; + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build(); + final TaskExecutor taskExecutor = new TaskExecutor( + rpc, + TaskManagerConfiguration.fromConfiguration(configuration), + haServices, + taskManagerServices, + new HeartbeatServices(heartbeatInterval, 10L), + UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), + dummyBlobCacheService, + testingFatalErrorHandler); + + taskExecutor.start(); + + final CompletableFuture<ResourceID> registrationFuture = new CompletableFuture<>(); + final OneShotLatch secondRegistration = new OneShotLatch(); + try { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + testingResourceManagerGateway.setRegisterTaskExecutorFunction( + tuple -> { + if (registrationFuture.complete(tuple.f1)) { + return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess( + new InstanceID(), + testingResourceManagerGateway.getOwnResourceId(), + heartbeatInterval, + new ClusterInformation("localhost", 1234))); + } else { + secondRegistration.trigger(); + return CompletableFuture.completedFuture(new RegistrationResponse.Decline("Only the first registration should succeed.")); + } + } + ); + rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + + resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), UUID.randomUUID()); + + final ResourceID registrationResourceId = registrationFuture.get(); + + assertThat(registrationResourceId, equalTo(taskManagerServices.getTaskManagerLocation().getResourceID())); + + secondRegistration.await(); + + final Throwable error = testingFatalErrorHandler.getErrorFuture().get(); + assertThat(error, is(notNullValue())); + assertThat(ExceptionUtils.stripExecutionException(error), instanceOf(RegistrationTimeoutException.class)); + + testingFatalErrorHandler.clearError(); + } finally { + RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); + } + } + private static final class StartStopNotifyingLeaderRetrievalService implements LeaderRetrievalService { private final CompletableFuture<LeaderRetrievalListener> startFuture;