StefanRRichter commented on a change in pull request #7665: [FLINK-11551][rpc] 
Allow RpcEndpoint to execute asynchronous stop operations
URL: https://github.com/apache/flink/pull/7665#discussion_r255898871
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 ##########
 @@ -486,122 +485,77 @@ protected void run() {
 
        @Test
        public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
-               final int dataPort = 1234;
-               final HardwareDescription hardwareDescription = new 
HardwareDescription(1, 2L, 3L, 4L);
-               final String taskManagerAddress = "tm";
-               final ResourceID taskManagerResourceID = new 
ResourceID(taskManagerAddress);
-               final ResourceID resourceManagerResourceID = 
ResourceID.generate();
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-
-               final TestingRpcService rpcService = new TestingRpcService();
-               rpcService.registerGateway(taskManagerAddress, 
taskExecutorGateway);
-
-               final TestingLeaderElectionService rmLeaderElectionService = 
new TestingLeaderElectionService();
-               final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
-               
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
-
-               final long heartbeatInterval = 1L;
-               final long heartbeatTimeout = 5L;
-               final ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
-               final HeartbeatServices heartbeatServices = new 
TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, 
scheduledExecutor);
-
-               final MetricRegistryImpl metricRegistry = 
mock(MetricRegistryImpl.class);
-               final JobLeaderIdService jobLeaderIdService = 
mock(JobLeaderIdService.class);
-               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
-               final SlotManager slotManager = new SlotManager(
-                       rpcService.getScheduledExecutor(),
-                       TestingUtils.infiniteTime(),
-                       TestingUtils.infiniteTime(),
-                       TestingUtils.infiniteTime());
-
-               try {
-                       final StandaloneResourceManager resourceManager = new 
StandaloneResourceManager(
-                               rpcService,
-                               FlinkResourceManager.RESOURCE_MANAGER_NAME,
-                               resourceManagerResourceID,
-                               highAvailabilityServices,
-                               heartbeatServices,
-                               slotManager,
-                               metricRegistry,
-                               jobLeaderIdService,
-                               new ClusterInformation("localhost", 1234),
-                               testingFatalErrorHandler,
-                               
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
-
-                       resourceManager.start();
-
-                       final ResourceManagerGateway rmGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
-
-                       final UUID rmLeaderSessionId = UUID.randomUUID();
-                       
rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(),
 TimeUnit.MILLISECONDS);
-
-                       // test registration response successful and it will 
trigger monitor heartbeat target, schedule heartbeat request at interval time
-                       CompletableFuture<RegistrationResponse> 
successfulFuture = rmGateway.registerTaskExecutor(
-                               taskManagerAddress,
-                               taskManagerResourceID,
-                               dataPort,
-                               hardwareDescription,
-                               timeout);
-                       RegistrationResponse response = 
successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-                       assertTrue(response instanceof 
TaskExecutorRegistrationSuccess);
-
-                       ArgumentCaptor<Runnable> heartbeatRunnableCaptor = 
ArgumentCaptor.forClass(Runnable.class);
-                       verify(scheduledExecutor, times(2)).scheduleAtFixedRate(
-                               heartbeatRunnableCaptor.capture(),
-                               eq(0L),
-                               eq(heartbeatInterval),
-                               eq(TimeUnit.MILLISECONDS));
-
-                       List<Runnable> heartbeatRunnable = 
heartbeatRunnableCaptor.getAllValues();
-
-                       ArgumentCaptor<Runnable> timeoutRunnableCaptor = 
ArgumentCaptor.forClass(Runnable.class);
-                       
verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), 
eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
-
-                       Runnable timeoutRunnable = 
timeoutRunnableCaptor.getValue();
-
-                       // run all the heartbeat requests
-                       for (Runnable runnable : heartbeatRunnable) {
-                               runnable.run();
-                       }
-
-                       verify(taskExecutorGateway, 
times(1)).heartbeatFromResourceManager(eq(resourceManagerResourceID));
-
-                       // run the timeout runnable to simulate a heartbeat 
timeout
-                       timeoutRunnable.run();
-
-                       verify(taskExecutorGateway, 
Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(any(TimeoutException.class));
-
-               } finally {
-                       RpcUtils.terminateRpcService(rpcService, timeout);
-               }
+               final CompletableFuture<ResourceID> heartbeatRequestFuture = 
new CompletableFuture<>();
 
 Review comment:
   Nice refactoring :-)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to