tillrohrmann commented on a change in pull request #15407:
URL: https://github.com/apache/flink/pull/15407#discussion_r605064618
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -509,6 +509,11 @@ public void disconnectTaskManager(final ResourceID
resourceId, final Exception c
@Override
public void disconnectJobManager(
final JobID jobId, JobStatus jobStatus, final Exception cause) {
+
+ if (jobStatus.isGloballyTerminalState()) {
Review comment:
Can we change `disconnectJobManager` to
```
if (jobStatus.isGloballyTerminalState()) {
removeJob(jobId, cause);
} else {
closeJobManagerConnection(jobId, RETAIN, cause);
}
```
I think that way it is a bit clearer that we remove the job if the
`jobStatus` is globally terminal.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
##########
@@ -271,6 +273,53 @@ public void testHeartbeatTimeoutWithTaskExecutor() throws
Exception {
});
}
+ @Test
+ public void testDisconnectJobManagerWithTerminalState() throws Exception {
+ testDisconnectJobManager(JobStatus.CANCELED);
+ }
+
+ @Test
+ public void testDisconnectJobManagerWithNonTerminalState() throws
Exception {
+ testDisconnectJobManager(JobStatus.FAILING);
+ }
+
+ private void testDisconnectJobManager(JobStatus jobStatus) throws
Exception {
+ final TestingJobMasterGateway jobMasterGateway =
+ new TestingJobMasterGatewayBuilder()
+ .setAddress(UUID.randomUUID().toString())
+ .build();
+ rpcService.registerGateway(jobMasterGateway.getAddress(),
jobMasterGateway);
+
+ final JobLeaderIdService jobLeaderIdService =
+ new JobLeaderIdService(
+ highAvailabilityServices,
+ rpcService.getScheduledExecutor(),
+ TestingUtils.infiniteTime());
+ resourceManager = createAndStartResourceManager(heartbeatServices,
jobLeaderIdService);
+
+ highAvailabilityServices.setJobMasterLeaderRetrieverFunction(
+ requestedJobId ->
+ new SettableLeaderRetrievalService(
+ jobMasterGateway.getAddress(),
+ jobMasterGateway.getFencingToken().toUUID()));
+
+ final JobID jobId = JobID.generate();
+ final ResourceManagerGateway resourceManagerGateway =
+ resourceManager.getSelfGateway(ResourceManagerGateway.class);
+ resourceManagerGateway
+ .registerJobManager(
+ jobMasterGateway.getFencingToken(),
+ ResourceID.generate(),
+ jobMasterGateway.getAddress(),
+ jobId,
+ TIMEOUT)
+ .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ assertThat(jobLeaderIdService.containsJob(jobId), is(true));
+ resourceManager.disconnectJobManager(jobId, jobStatus, null);
+ assertThat(jobLeaderIdService.containsJob(jobId),
is(!jobStatus.isGloballyTerminalState()));
Review comment:
Same here with the access of the service outside of the main thread.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
##########
@@ -271,6 +273,53 @@ public void testHeartbeatTimeoutWithTaskExecutor() throws
Exception {
});
}
+ @Test
+ public void testDisconnectJobManagerWithTerminalState() throws Exception {
Review comment:
I think it helps if the test name states the expected outcome.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
##########
@@ -271,6 +273,53 @@ public void testHeartbeatTimeoutWithTaskExecutor() throws
Exception {
});
}
+ @Test
+ public void testDisconnectJobManagerWithTerminalState() throws Exception {
+ testDisconnectJobManager(JobStatus.CANCELED);
+ }
+
+ @Test
+ public void testDisconnectJobManagerWithNonTerminalState() throws
Exception {
Review comment:
Same here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
##########
@@ -271,6 +273,53 @@ public void testHeartbeatTimeoutWithTaskExecutor() throws
Exception {
});
}
+ @Test
+ public void testDisconnectJobManagerWithTerminalState() throws Exception {
+ testDisconnectJobManager(JobStatus.CANCELED);
+ }
+
+ @Test
+ public void testDisconnectJobManagerWithNonTerminalState() throws
Exception {
+ testDisconnectJobManager(JobStatus.FAILING);
+ }
+
+ private void testDisconnectJobManager(JobStatus jobStatus) throws
Exception {
+ final TestingJobMasterGateway jobMasterGateway =
+ new TestingJobMasterGatewayBuilder()
+ .setAddress(UUID.randomUUID().toString())
+ .build();
+ rpcService.registerGateway(jobMasterGateway.getAddress(),
jobMasterGateway);
+
+ final JobLeaderIdService jobLeaderIdService =
+ new JobLeaderIdService(
+ highAvailabilityServices,
+ rpcService.getScheduledExecutor(),
+ TestingUtils.infiniteTime());
+ resourceManager = createAndStartResourceManager(heartbeatServices,
jobLeaderIdService);
+
+ highAvailabilityServices.setJobMasterLeaderRetrieverFunction(
+ requestedJobId ->
+ new SettableLeaderRetrievalService(
+ jobMasterGateway.getAddress(),
+ jobMasterGateway.getFencingToken().toUUID()));
+
+ final JobID jobId = JobID.generate();
+ final ResourceManagerGateway resourceManagerGateway =
+ resourceManager.getSelfGateway(ResourceManagerGateway.class);
+ resourceManagerGateway
+ .registerJobManager(
+ jobMasterGateway.getFencingToken(),
+ ResourceID.generate(),
+ jobMasterGateway.getAddress(),
+ jobId,
+ TIMEOUT)
+ .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ assertThat(jobLeaderIdService.containsJob(jobId), is(true));
+ resourceManager.disconnectJobManager(jobId, jobStatus, null);
Review comment:
I think we should not directly call `disconnectJobManager` on the
`resourceManager` but instead use the gateway.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
##########
@@ -271,6 +273,53 @@ public void testHeartbeatTimeoutWithTaskExecutor() throws
Exception {
});
}
+ @Test
+ public void testDisconnectJobManagerWithTerminalState() throws Exception {
+ testDisconnectJobManager(JobStatus.CANCELED);
+ }
+
+ @Test
+ public void testDisconnectJobManagerWithNonTerminalState() throws
Exception {
+ testDisconnectJobManager(JobStatus.FAILING);
+ }
+
+ private void testDisconnectJobManager(JobStatus jobStatus) throws
Exception {
+ final TestingJobMasterGateway jobMasterGateway =
+ new TestingJobMasterGatewayBuilder()
+ .setAddress(UUID.randomUUID().toString())
+ .build();
+ rpcService.registerGateway(jobMasterGateway.getAddress(),
jobMasterGateway);
+
+ final JobLeaderIdService jobLeaderIdService =
+ new JobLeaderIdService(
+ highAvailabilityServices,
+ rpcService.getScheduledExecutor(),
+ TestingUtils.infiniteTime());
+ resourceManager = createAndStartResourceManager(heartbeatServices,
jobLeaderIdService);
+
+ highAvailabilityServices.setJobMasterLeaderRetrieverFunction(
+ requestedJobId ->
+ new SettableLeaderRetrievalService(
+ jobMasterGateway.getAddress(),
+ jobMasterGateway.getFencingToken().toUUID()));
+
+ final JobID jobId = JobID.generate();
+ final ResourceManagerGateway resourceManagerGateway =
+ resourceManager.getSelfGateway(ResourceManagerGateway.class);
+ resourceManagerGateway
+ .registerJobManager(
+ jobMasterGateway.getFencingToken(),
+ ResourceID.generate(),
+ jobMasterGateway.getAddress(),
+ jobId,
+ TIMEOUT)
+ .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+ assertThat(jobLeaderIdService.containsJob(jobId), is(true));
Review comment:
I think accessing these services outside of the main thread is dangerous
as they are not thread safe and might give us wrong results.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]