StefanRRichter commented on a change in pull request #7665: [FLINK-11551][rpc]
Allow RpcEndpoint to execute asynchronous stop operations
URL: https://github.com/apache/flink/pull/7665#discussion_r255898871
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
##########
@@ -486,122 +485,77 @@ protected void run() {
@Test
public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
- final int dataPort = 1234;
- final HardwareDescription hardwareDescription = new
HardwareDescription(1, 2L, 3L, 4L);
- final String taskManagerAddress = "tm";
- final ResourceID taskManagerResourceID = new
ResourceID(taskManagerAddress);
- final ResourceID resourceManagerResourceID =
ResourceID.generate();
- final TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
-
- final TestingRpcService rpcService = new TestingRpcService();
- rpcService.registerGateway(taskManagerAddress,
taskExecutorGateway);
-
- final TestingLeaderElectionService rmLeaderElectionService =
new TestingLeaderElectionService();
- final TestingHighAvailabilityServices highAvailabilityServices
= new TestingHighAvailabilityServices();
-
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
-
- final long heartbeatInterval = 1L;
- final long heartbeatTimeout = 5L;
- final ScheduledExecutor scheduledExecutor =
mock(ScheduledExecutor.class);
- final HeartbeatServices heartbeatServices = new
TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout,
scheduledExecutor);
-
- final MetricRegistryImpl metricRegistry =
mock(MetricRegistryImpl.class);
- final JobLeaderIdService jobLeaderIdService =
mock(JobLeaderIdService.class);
- final TestingFatalErrorHandler testingFatalErrorHandler = new
TestingFatalErrorHandler();
- final SlotManager slotManager = new SlotManager(
- rpcService.getScheduledExecutor(),
- TestingUtils.infiniteTime(),
- TestingUtils.infiniteTime(),
- TestingUtils.infiniteTime());
-
- try {
- final StandaloneResourceManager resourceManager = new
StandaloneResourceManager(
- rpcService,
- FlinkResourceManager.RESOURCE_MANAGER_NAME,
- resourceManagerResourceID,
- highAvailabilityServices,
- heartbeatServices,
- slotManager,
- metricRegistry,
- jobLeaderIdService,
- new ClusterInformation("localhost", 1234),
- testingFatalErrorHandler,
-
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
-
- resourceManager.start();
-
- final ResourceManagerGateway rmGateway =
resourceManager.getSelfGateway(ResourceManagerGateway.class);
-
- final UUID rmLeaderSessionId = UUID.randomUUID();
-
rmLeaderElectionService.isLeader(rmLeaderSessionId).get(timeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
-
- // test registration response successful and it will
trigger monitor heartbeat target, schedule heartbeat request at interval time
- CompletableFuture<RegistrationResponse>
successfulFuture = rmGateway.registerTaskExecutor(
- taskManagerAddress,
- taskManagerResourceID,
- dataPort,
- hardwareDescription,
- timeout);
- RegistrationResponse response =
successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- assertTrue(response instanceof
TaskExecutorRegistrationSuccess);
-
- ArgumentCaptor<Runnable> heartbeatRunnableCaptor =
ArgumentCaptor.forClass(Runnable.class);
- verify(scheduledExecutor, times(2)).scheduleAtFixedRate(
- heartbeatRunnableCaptor.capture(),
- eq(0L),
- eq(heartbeatInterval),
- eq(TimeUnit.MILLISECONDS));
-
- List<Runnable> heartbeatRunnable =
heartbeatRunnableCaptor.getAllValues();
-
- ArgumentCaptor<Runnable> timeoutRunnableCaptor =
ArgumentCaptor.forClass(Runnable.class);
-
verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(),
eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
-
- Runnable timeoutRunnable =
timeoutRunnableCaptor.getValue();
-
- // run all the heartbeat requests
- for (Runnable runnable : heartbeatRunnable) {
- runnable.run();
- }
-
- verify(taskExecutorGateway,
times(1)).heartbeatFromResourceManager(eq(resourceManagerResourceID));
-
- // run the timeout runnable to simulate a heartbeat
timeout
- timeoutRunnable.run();
-
- verify(taskExecutorGateway,
Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(any(TimeoutException.class));
-
- } finally {
- RpcUtils.terminateRpcService(rpcService, timeout);
- }
+ final CompletableFuture<ResourceID> heartbeatRequestFuture =
new CompletableFuture<>();
Review comment:
Nice refactoring :-)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services