[FLINK-6160] Add reconnection attempts in case of heartbeat timeouts to 
JobMaster and TaskExecutor

If a timeout with the RM occurs on on the JobMaster and TaskExecutor, then they 
will both try to reconnect
to the last known RM address.

Additionally, we now respect the TaskManagerOption#REGISTRATION_TIMEOUT on the 
TaskExecutor. This means that
if the TaskExecutor could not register at a RM within the given registration 
timeout, it will fail with a
fatal exception. This allows to fail the TaskExecutor process in case that it 
cannot establish a connection
and ultimately frees the occupied resources.

The commit also changes the default value for 
TaskManagerOption#REGISTRATION_TIMEOUT from "Inf" to "5 min".

This closes #6035.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba379ec0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba379ec0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba379ec0

Branch: refs/heads/release-1.5
Commit: ba379ec0ee451fee807798094f2948741489a263
Parents: 884c2e3
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu May 17 14:44:14 2018 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri May 18 01:03:54 2018 +0200

----------------------------------------------------------------------
 .../generated/task_manager_configuration.html   |  2 +-
 .../flink/configuration/TaskManagerOptions.java |  2 +-
 .../flink/runtime/jobmaster/JobMaster.java      | 41 ++++++---
 .../runtime/taskexecutor/TaskExecutor.java      | 85 +++++++++++++----
 .../taskexecutor/TaskManagerConfiguration.java  |  7 +-
 .../RegistrationTimeoutException.java           | 41 +++++++++
 .../flink/runtime/jobmaster/JobMasterTest.java  | 20 ++--
 .../utils/TestingResourceManagerGateway.java    |  4 +
 .../runtime/taskexecutor/TaskExecutorTest.java  | 96 ++++++++++++++++++++
 9 files changed, 257 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba379ec0/docs/_includes/generated/task_manager_configuration.html
----------------------------------------------------------------------
diff --git a/docs/_includes/generated/task_manager_configuration.html 
b/docs/_includes/generated/task_manager_configuration.html
index fe999a8..fdb975f 100644
--- a/docs/_includes/generated/task_manager_configuration.html
+++ b/docs/_includes/generated/task_manager_configuration.html
@@ -154,7 +154,7 @@
         </tr>
         <tr>
             <td><h5>taskmanager.registration.timeout</h5></td>
-            <td style="word-wrap: break-word;">"Inf"</td>
+            <td style="word-wrap: break-word;">"5 min"</td>
             <td>Defines the timeout for the TaskManager registration. If the 
duration is exceeded without a successful registration, then the TaskManager 
terminates.</td>
         </tr>
         <tr>

http://git-wip-us.apache.org/repos/asf/flink/blob/ba379ec0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index b726e07..8017f7a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -139,7 +139,7 @@ public class TaskManagerOptions {
         */
        public static final ConfigOption<String> REGISTRATION_TIMEOUT =
                key("taskmanager.registration.timeout")
-                       .defaultValue("Inf")
+                       .defaultValue("5 min")
                        
.withDeprecatedKeys("taskmanager.maxRegistrationDuration")
                        .withDescription("Defines the timeout for the 
TaskManager registration. If the duration is" +
                                " exceeded without a successful registration, 
then the TaskManager terminates.");

http://git-wip-us.apache.org/repos/asf/flink/blob/ba379ec0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index f1dbbb4..5f5a9a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1260,20 +1260,24 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                }
 
                if (resourceManagerAddress != null) {
-                       log.info("Attempting to register at ResourceManager 
{}", resourceManagerAddress);
+                       createResourceManagerConnection(resourceManagerAddress, 
resourceManagerId);
+               }
+       }
 
-                       resourceManagerConnection = new 
ResourceManagerConnection(
-                               log,
-                               jobGraph.getJobID(),
-                               resourceId,
-                               getAddress(),
-                               getFencingToken(),
-                               resourceManagerAddress,
-                               resourceManagerId,
-                               scheduledExecutorService);
+       private void createResourceManagerConnection(String 
resourceManagerAddress, ResourceManagerId resourceManagerId) {
+               log.info("Attempting to register at ResourceManager {}", 
resourceManagerAddress);
 
-                       resourceManagerConnection.start();
-               }
+               resourceManagerConnection = new ResourceManagerConnection(
+                       log,
+                       jobGraph.getJobID(),
+                       resourceId,
+                       getAddress(),
+                       getFencingToken(),
+                       resourceManagerAddress,
+                       resourceManagerId,
+                       scheduledExecutorService);
+
+               resourceManagerConnection.start();
        }
 
        private void establishResourceManagerConnection(final 
JobMasterRegistrationSuccess success) {
@@ -1605,9 +1609,16 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        runAsync(() -> {
                                log.info("The heartbeat of ResourceManager with 
id {} timed out.", resourceId);
 
-                               closeResourceManagerConnection(
-                                       new TimeoutException(
-                                               "The heartbeat of 
ResourceManager with id " + resourceId + " timed out."));
+                               if (establishedResourceManagerConnection != 
null && 
establishedResourceManagerConnection.getResourceManagerResourceID().equals(resourceId))
 {
+                                       final String resourceManagerAddress = 
establishedResourceManagerConnection.getResourceManagerGateway().getAddress();
+                                       final ResourceManagerId 
resourceManagerId = establishedResourceManagerConnection.getResourceManagerId();
+
+                                       closeResourceManagerConnection(
+                                               new TimeoutException(
+                                                       "The heartbeat of 
ResourceManager with id " + resourceId + " timed out."));
+
+                                       
createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
+                               }
                        });
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba379ec0/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 000d0c2..3aa046d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -77,6 +77,7 @@ import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
 import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
 import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
@@ -100,6 +101,9 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -184,6 +188,9 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
        private final HardwareDescription hardwareDescription;
 
+       @Nullable
+       private UUID currentRegistrationTimeoutId;
+
        public TaskExecutor(
                        RpcService rpcService,
                        TaskManagerConfiguration taskManagerConfiguration,
@@ -229,8 +236,10 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        rpcService.getScheduledExecutor(),
                        log);
 
-               hardwareDescription = HardwareDescription.extractFromSystem(
+               this.hardwareDescription = 
HardwareDescription.extractFromSystem(
                        
taskExecutorServices.getMemoryManager().getMemorySize());
+
+               this.currentRegistrationTimeoutId = null;
        }
 
        // 
------------------------------------------------------------------------
@@ -253,6 +262,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
                // start the job leader service
                jobLeaderService.start(getAddress(), getRpcService(), 
haServices, new JobLeaderListenerImpl());
+
+               startRegistrationTimeout();
        }
 
        /**
@@ -264,7 +275,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
                Throwable throwable = null;
 
-               if (isConnectedToResourceManager()) {
+               if (resourceManagerConnection != null) {
                        resourceManagerConnection.close();
                }
 
@@ -870,24 +881,28 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
                // establish a connection to the new leader
                if (newLeaderAddress != null) {
-                       log.info("Attempting to register at ResourceManager {} 
({})", newLeaderAddress, newResourceManagerId);
-                       resourceManagerConnection =
-                               new TaskExecutorToResourceManagerConnection(
-                                       log,
-                                       getRpcService(),
-                                       getAddress(),
-                                       getResourceID(),
-                                       
taskSlotTable.createSlotReport(getResourceID()),
-                                       taskManagerLocation.dataPort(),
-                                       hardwareDescription,
-                                       newLeaderAddress,
-                                       newResourceManagerId,
-                                       getMainThreadExecutor(),
-                                       new 
ResourceManagerRegistrationListener());
-                       resourceManagerConnection.start();
+                       createResourceManagerConnection(newLeaderAddress, 
newResourceManagerId);
                }
        }
 
+       private void createResourceManagerConnection(String newLeaderAddress, 
ResourceManagerId newResourceManagerId) {
+               log.info("Attempting to register at ResourceManager {} ({})", 
newLeaderAddress, newResourceManagerId);
+               resourceManagerConnection =
+                       new TaskExecutorToResourceManagerConnection(
+                               log,
+                               getRpcService(),
+                               getAddress(),
+                               getResourceID(),
+                               taskSlotTable.createSlotReport(getResourceID()),
+                               taskManagerLocation.dataPort(),
+                               hardwareDescription,
+                               newLeaderAddress,
+                               newResourceManagerId,
+                               getMainThreadExecutor(),
+                               new ResourceManagerRegistrationListener());
+               resourceManagerConnection.start();
+       }
+
        private void establishResourceManagerConnection(
                        ResourceID resourceManagerResourceId,
                        ClusterInformation clusterInformation) {
@@ -911,6 +926,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        clusterInformation.getBlobServerPort());
 
                blobCacheService.setBlobServerAddress(blobServerAddress);
+
+               stopRegistrationTimeout();
        }
 
        private void closeResourceManagerConnection(Exception cause) {
@@ -941,6 +958,34 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        resourceManagerConnection.close();
                        resourceManagerConnection = null;
                }
+
+               startRegistrationTimeout();
+       }
+
+       private void startRegistrationTimeout() {
+               final Time maxRegistrationDuration = 
taskManagerConfiguration.getMaxRegistrationDuration();
+
+               if (maxRegistrationDuration != null) {
+                       final UUID newRegistrationTimeoutId = UUID.randomUUID();
+                       currentRegistrationTimeoutId = newRegistrationTimeoutId;
+                       scheduleRunAsync(() -> 
registrationTimeout(newRegistrationTimeoutId), maxRegistrationDuration);
+               }
+       }
+
+       private void stopRegistrationTimeout() {
+               currentRegistrationTimeoutId = null;
+       }
+
+       private void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
+               if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) 
{
+                       final Time maxRegistrationDuration = 
taskManagerConfiguration.getMaxRegistrationDuration();
+
+                       onFatalError(
+                               new RegistrationTimeoutException(
+                                       String.format("Could not register at 
the ResourceManager within the specified maximum " +
+                                               "registration duration %s. This 
indicates a problem with this instance. Terminating now.",
+                                               maxRegistrationDuration)));
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -1550,9 +1595,15 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                if (resourceManagerConnection != null && 
resourceManagerConnection.getResourceManagerId().equals(resourceId)) {
                                        log.info("The heartbeat of 
ResourceManager with id {} timed out.", resourceId);
 
+                                       final String resourceManagerAddress = 
resourceManagerConnection.getTargetAddress();
+                                       final ResourceManagerId 
resourceManagerId = resourceManagerConnection.getTargetLeaderId();
+
                                        closeResourceManagerConnection(
                                                new TimeoutException(
                                                        "The heartbeat of 
ResourceManager with id " + resourceId + " timed out."));
+
+                                       // try to reconnect to old 
ResourceManager
+                                       
createResourceManagerConnection(resourceManagerAddress, resourceManagerId);
                                } else {
                                        log.debug("Received heartbeat timeout 
for outdated ResourceManager id {}. Ignoring the timeout.", resourceId);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba379ec0/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index e8a7ae8..91beabf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -50,8 +50,11 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
        private final String[] tmpDirectories;
 
        private final Time timeout;
+
        // null indicates an infinite duration
+       @Nullable
        private final Time maxRegistrationDuration;
+
        private final Time initialRegistrationPause;
        private final Time maxRegistrationPause;
        private final Time refusedRegistrationPause;
@@ -74,7 +77,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                int numberSlots,
                String[] tmpDirectories,
                Time timeout,
-               Time maxRegistrationDuration,
+               @Nullable Time maxRegistrationDuration,
                Time initialRegistrationPause,
                Time maxRegistrationPause,
                Time refusedRegistrationPause,
@@ -108,6 +111,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                return timeout;
        }
 
+       @Nullable
        public Time getMaxRegistrationDuration() {
                return maxRegistrationDuration;
        }
@@ -116,6 +120,7 @@ public class TaskManagerConfiguration implements 
TaskManagerRuntimeInfo {
                return initialRegistrationPause;
        }
 
+       @Nullable
        public Time getMaxRegistrationPause() {
                return maxRegistrationPause;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba379ec0/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java
new file mode 100644
index 0000000..561089a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/RegistrationTimeoutException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+
+/**
+ * Exception which indicates that the {@link TaskExecutor} could not register 
at
+ * the master in time.
+ */
+public class RegistrationTimeoutException extends TaskManagerException {
+       private static final long serialVersionUID = -6377818046575001931L;
+
+       public RegistrationTimeoutException(String message) {
+               super(message);
+       }
+
+       public RegistrationTimeoutException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public RegistrationTimeoutException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba379ec0/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index f2de134..09640d5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -90,6 +90,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -243,17 +244,21 @@ public class JobMasterTest extends TestLogger {
                        resourceManagerId,
                        rmResourceId,
                        fastHeartbeatInterval,
-                       "localhost",
+                       resourceManagerAddress,
                        "localhost");
 
                final CompletableFuture<Tuple3<JobMasterId, ResourceID, JobID>> 
jobManagerRegistrationFuture = new CompletableFuture<>();
                final CompletableFuture<JobID> disconnectedJobManagerFuture = 
new CompletableFuture<>();
+               final CountDownLatch registrationAttempts = new 
CountDownLatch(2);
 
-               resourceManagerGateway.setRegisterJobManagerConsumer(tuple -> 
jobManagerRegistrationFuture.complete(
-                       Tuple3.of(
-                               tuple.f0,
-                               tuple.f1,
-                               tuple.f3)));
+               resourceManagerGateway.setRegisterJobManagerConsumer(tuple -> {
+                       jobManagerRegistrationFuture.complete(
+                               Tuple3.of(
+                                       tuple.f0,
+                                       tuple.f1,
+                                       tuple.f3));
+                       registrationAttempts.countDown();
+               });
 
                resourceManagerGateway.setDisconnectJobManagerConsumer(tuple -> 
disconnectedJobManagerFuture.complete(tuple.f0));
 
@@ -286,6 +291,9 @@ public class JobMasterTest extends TestLogger {
 
                        // heartbeat timeout should trigger disconnect 
JobManager from ResourceManager
                        assertThat(disconnectedJobManager, 
Matchers.equalTo(jobGraph.getJobID()));
+
+                       // the JobMaster should try to reconnect to the RM
+                       registrationAttempts.await();
                } finally {
                        jobManagerSharedServices.shutdown();
                        RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/ba379ec0/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index 9b40414..106fd73 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -109,6 +109,10 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
                this.requestSlotConsumer = null;
        }
 
+       public ResourceID getOwnResourceId() {
+               return ownResourceId;
+       }
+
        public void setRequestSlotFuture(CompletableFuture<Acknowledge> 
slotFuture) {
                this.slotFutureReference.set(slotFuture);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba379ec0/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 6b3c8f6..ca31f76 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -76,6 +78,7 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -87,6 +90,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
@@ -113,12 +117,15 @@ import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -302,9 +309,11 @@ public class TaskExecutorTest extends TestLogger {
                        new ClusterInformation("localhost", 1234));
 
                final CompletableFuture<ResourceID> 
taskExecutorRegistrationFuture = new CompletableFuture<>();
+               final CountDownLatch registrationAttempts = new 
CountDownLatch(2);
                rmGateway.setRegisterTaskExecutorFunction(
                        registration -> {
                                
taskExecutorRegistrationFuture.complete(registration.f1);
+                               registrationAttempts.countDown();
                                return 
CompletableFuture.completedFuture(registrationResponse);
                        });
 
@@ -353,6 +362,9 @@ public class TaskExecutorTest extends TestLogger {
                        // heartbeat timeout should trigger disconnect 
TaskManager from ResourceManager
                        
assertThat(taskExecutorDisconnectFuture.get(heartbeatTimeout * 50L, 
TimeUnit.MILLISECONDS), equalTo(taskManagerLocation.getResourceID()));
 
+                       // the TaskExecutor should try to reconnect to the RM
+                       registrationAttempts.await();
+
                } finally {
                        RpcUtils.terminateRpcEndpoint(taskManager, timeout);
                }
@@ -1387,6 +1399,90 @@ public class TaskExecutorTest extends TestLogger {
                }
        }
 
+       @Test
+       public void testMaximumRegistrationDuration() throws Exception {
+               
configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms");
+
+               final TaskExecutor taskExecutor = new TaskExecutor(
+                       rpc,
+                       
TaskManagerConfiguration.fromConfiguration(configuration),
+                       haServices,
+                       new TaskManagerServicesBuilder().build(),
+                       new HeartbeatServices(1000L, 1000L),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                       dummyBlobCacheService,
+                       testingFatalErrorHandler);
+
+               taskExecutor.start();
+
+               try {
+                       final Throwable error = 
testingFatalErrorHandler.getErrorFuture().get();
+                       assertThat(error, is(notNullValue()));
+                       
assertThat(ExceptionUtils.stripExecutionException(error), 
instanceOf(RegistrationTimeoutException.class));
+
+                       testingFatalErrorHandler.clearError();
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+               }
+       }
+
+       @Test
+       public void testMaximumRegistrationDurationAfterConnectionLoss() throws 
Exception {
+               
configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "100 ms");
+               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), timerService);
+
+               final long heartbeatInterval = 10L;
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build();
+               final TaskExecutor taskExecutor = new TaskExecutor(
+                       rpc,
+                       
TaskManagerConfiguration.fromConfiguration(configuration),
+                       haServices,
+                       taskManagerServices,
+                       new HeartbeatServices(heartbeatInterval, 10L),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                       dummyBlobCacheService,
+                       testingFatalErrorHandler);
+
+               taskExecutor.start();
+
+               final CompletableFuture<ResourceID> registrationFuture = new 
CompletableFuture<>();
+               final OneShotLatch secondRegistration = new OneShotLatch();
+               try {
+                       final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+                       
testingResourceManagerGateway.setRegisterTaskExecutorFunction(
+                               tuple -> {
+                                       if 
(registrationFuture.complete(tuple.f1)) {
+                                               return 
CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
+                                                       new InstanceID(),
+                                                       
testingResourceManagerGateway.getOwnResourceId(),
+                                                       heartbeatInterval,
+                                                       new 
ClusterInformation("localhost", 1234)));
+                                       } else {
+                                               secondRegistration.trigger();
+                                               return 
CompletableFuture.completedFuture(new RegistrationResponse.Decline("Only the 
first registration should succeed."));
+                                       }
+                               }
+                       );
+                       
rpc.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+
+                       
resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(),
 UUID.randomUUID());
+
+                       final ResourceID registrationResourceId = 
registrationFuture.get();
+
+                       assertThat(registrationResourceId, 
equalTo(taskManagerServices.getTaskManagerLocation().getResourceID()));
+
+                       secondRegistration.await();
+
+                       final Throwable error = 
testingFatalErrorHandler.getErrorFuture().get();
+                       assertThat(error, is(notNullValue()));
+                       
assertThat(ExceptionUtils.stripExecutionException(error), 
instanceOf(RegistrationTimeoutException.class));
+
+                       testingFatalErrorHandler.clearError();
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+               }
+       }
+
        private static final class StartStopNotifyingLeaderRetrievalService 
implements LeaderRetrievalService {
                private final CompletableFuture<LeaderRetrievalListener> 
startFuture;
 

Reply via email to