ifndef-SleePy closed pull request #7290: [FLINK-11137] [runtime] Fix unexpected 
RegistrationTimeoutException of TaskExecutor
URL: https://github.com/apache/flink/pull/7290
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index a4548c11363..3403f0e6635 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -272,6 +272,7 @@ public TaskExecutor(
        public void start() throws Exception {
                super.start();
 
+               startRegistrationTimeout();
                // start by connecting to the ResourceManager
                try {
                        resourceManagerLeaderRetriever.start(new 
ResourceManagerLeaderListener());
@@ -286,8 +287,6 @@ public void start() throws Exception {
                jobLeaderService.start(getAddress(), getRpcService(), 
haServices, new JobLeaderListenerImpl());
 
                fileCache = new 
FileCache(taskManagerConfiguration.getTmpDirectories(), 
blobCacheService.getPermanentBlobService());
-
-               startRegistrationTimeout();
        }
 
        /**
@@ -1024,11 +1023,13 @@ private void startRegistrationTimeout() {
                }
        }
 
-       private void stopRegistrationTimeout() {
+       @VisibleForTesting
+       void stopRegistrationTimeout() {
                currentRegistrationTimeoutId = null;
        }
 
-       private void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
+       @VisibleForTesting
+       void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
                if (registrationTimeoutId.equals(currentRegistrationTimeoutId)) 
{
                        final Time maxRegistrationDuration = 
taskManagerConfiguration.getMaxRegistrationDuration();
 
@@ -1480,6 +1481,11 @@ TaskExecutorToResourceManagerConnection 
getResourceManagerConnection() {
                return resourceManagerHeartbeatManager;
        }
 
+       @VisibleForTesting
+       UUID getCurrentRegistrationTimeoutId() {
+               return currentRegistrationTimeoutId;
+       }
+
        // 
------------------------------------------------------------------------
        //  Utility classes
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 5ceb8236691..96315650b7d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -67,18 +67,21 @@
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
+import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -109,6 +112,7 @@
 import org.slf4j.Logger;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
@@ -1698,6 +1702,84 @@ public void testOfferSlotToJobMasterAfterTimeout() 
throws Exception {
                }
        }
 
+       @Test
+       public void testRegistrationToRMFast() throws Exception {
+               final String resourceManagerAddress = 
"/resource/manager/address/one";
+               final ResourceID resourceManagerResourceId = new 
ResourceID(resourceManagerAddress);
+               final String dispatcherAddress = "localhost";
+               final String jobManagerAddress = "localhost";
+               final String webMonitorAddress = "localhost";
+
+               final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+               testingResourceManagerGateway.setRegisterTaskExecutorFunction(
+                       ignore -> CompletableFuture.completedFuture(
+                               new TaskExecutorRegistrationSuccess(
+                                       new InstanceID(),
+                                       resourceManagerResourceId,
+                                       new ClusterInformation("localhost", 
1234))));
+
+               testingResourceManagerGateway.setSendSlotReportFunction(
+                       resourceIDInstanceIDSlotReportTuple3 -> 
CompletableFuture.completedFuture(Acknowledge.get()));
+
+               rpc.registerGateway(resourceManagerAddress, 
testingResourceManagerGateway);
+
+               StandaloneHaServices haServices = new StandaloneHaServices(
+                       resourceManagerAddress,
+                       dispatcherAddress,
+                       jobManagerAddress,
+                       webMonitorAddress);
+
+               final CompletableFuture<Boolean> stopRegistrationTimeoutFuture 
= new CompletableFuture<>();
+
+               // The main thread would be blocked until registration 
succeeded.
+               // To simulate that registration is too fast to main thread.
+               final BlockingTaskSlotTable taskSlotTable = new 
BlockingTaskSlotTable(
+                       Collections.singletonList(ResourceProfile.UNKNOWN),
+                       new TimerService<>(TestingUtils.defaultExecutor(), 
timeout.toMilliseconds()),
+                       stopRegistrationTimeoutFuture,
+                       // Set a timeout limit to make sure if there are some 
unexpected codes,
+                       // the test case would fail due to timeout, instead of 
hanging forever.
+                       60000);
+
+               TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
+                       false,
+                       new File[]{tmp.newFolder()},
+                       Executors.directExecutor());
+
+               final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
+                       .setTaskManagerLocation(taskManagerLocation)
+                       .setTaskSlotTable(taskSlotTable)
+                       .setTaskStateManager(localStateStoresManager)
+                       .build();
+
+               // Use a future instead of a static timeout configuration.
+               final Configuration configuration = new Configuration();
+               
configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "0 s");
+
+               TaskExecutor taskManager = new 
TaskExecutorWithNotificationOfStoppingRegistrationTimeout(
+                       rpc,
+                       
TaskManagerConfiguration.fromConfiguration(configuration),
+                       haServices,
+                       taskManagerServices,
+                       new HeartbeatServices(1000L, 1000L),
+                       
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                       null,
+                       dummyBlobCacheService,
+                       testingFatalErrorHandler,
+                       stopRegistrationTimeoutFuture);
+
+               try {
+                       taskManager.start();
+
+                       
assertNull(taskManager.getCurrentRegistrationTimeoutId());
+
+                       
assertNotNull(taskManager.getResourceManagerConnection());
+               } finally {
+                       taskManager.shutDown();
+                       
taskManager.getTerminationFuture().get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+               }
+       }
+
        @Nonnull
        private TaskExecutor createTaskExecutor(TaskManagerServices 
taskManagerServices) {
                return new TaskExecutor(
@@ -1808,4 +1890,92 @@ public void monitorTarget(ResourceID resourceID, 
HeartbeatTarget<O> heartbeatTar
                        monitoredTargets.offer(resourceID);
                }
        }
+
+       /**
+        * This class provides a way to get notification when stopping 
registration timeout checking.
+        */
+       private static final class 
TaskExecutorWithNotificationOfStoppingRegistrationTimeout extends TaskExecutor {
+
+               private final CompletableFuture<Boolean> stopRegistrationFuture;
+
+               TaskExecutorWithNotificationOfStoppingRegistrationTimeout(
+                       RpcService rpcService,
+                       TaskManagerConfiguration taskManagerConfiguration,
+                       HighAvailabilityServices haServices,
+                       TaskManagerServices taskExecutorServices,
+                       HeartbeatServices heartbeatServices,
+                       TaskManagerMetricGroup taskManagerMetricGroup,
+                       @Nullable String metricQueryServicePath,
+                       BlobCacheService blobCacheService,
+                       FatalErrorHandler fatalErrorHandler,
+                       CompletableFuture<Boolean> stopRegistrationFuture) {
+
+                       super(
+                               rpcService,
+                               taskManagerConfiguration,
+                               haServices,
+                               taskExecutorServices,
+                               heartbeatServices,
+                               taskManagerMetricGroup,
+                               metricQueryServicePath,
+                               blobCacheService,
+                               fatalErrorHandler);
+
+                       this.stopRegistrationFuture = stopRegistrationFuture;
+               }
+
+               @Override
+               void registrationTimeout(@Nonnull UUID registrationTimeoutId) {
+                       assertNotNull(registrationTimeoutId);
+                       stopRegistrationFuture.whenComplete((completed, 
throwable) -> {
+                               assertTrue(completed);
+                               assertNull(throwable);
+
+                               // To make sure that timeout checking will run 
after registration
+                               
super.registrationTimeout(registrationTimeoutId);
+                       });
+               }
+
+               @Override
+               void stopRegistrationTimeout() {
+                       super.stopRegistrationTimeout();
+                       stopRegistrationFuture.complete(true);
+               }
+       }
+
+       /**
+        * A implementation of {@link TaskSlotTable} that can be blocked with a 
future when starting.
+        */
+       private static final class BlockingTaskSlotTable extends TaskSlotTable {
+
+               private final CompletableFuture<Boolean> completableFuture;
+
+               private final long timeout;
+
+               BlockingTaskSlotTable(
+                       Collection<ResourceProfile> resourceProfiles,
+                       TimerService<AllocationID> timerService,
+                       CompletableFuture<Boolean> completableFuture,
+                       long timeout) {
+
+                       super(resourceProfiles, timerService);
+
+                       this.completableFuture = completableFuture;
+                       this.timeout = timeout;
+               }
+
+               @Override
+               public void start(SlotActions initialSlotActions) {
+                       try {
+                               completableFuture.get(timeout, 
TimeUnit.MILLISECONDS);
+                       } catch(Exception e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+
+               @Override
+               public SlotReport createSlotReport(ResourceID resourceId) {
+                       return new SlotReport();
+               }
+       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to