wangyang0918 commented on a change in pull request #17362:
URL: https://github.com/apache/flink/pull/17362#discussion_r716473958



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -492,7 +492,8 @@ protected void onWorkerRegistered(WorkerType worker) {
 
     @Override
     public void disconnectTaskManager(final ResourceID resourceId, final 
Exception cause) {
-        closeTaskManagerConnection(resourceId, cause);
+        closeTaskManagerConnection(resourceId, 
cause).ifPresent(ResourceManager.this::stopWorker);

Review comment:
       I am afraid we probably not need to stop worker here. If the TaskManager 
want to reconnect to the ResourceManager, it will call 
`closeResourceManagerConnection` first, as well as `disconnectTaskManager`. In 
such case, the TaskManager will be released.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -492,7 +492,8 @@ protected void onWorkerRegistered(WorkerType worker) {
 
     @Override
     public void disconnectTaskManager(final ResourceID resourceId, final 
Exception cause) {
-        closeTaskManagerConnection(resourceId, cause);
+        closeTaskManagerConnection(resourceId, 
cause).ifPresent(ResourceManager.this::stopWorker);
+        ;

Review comment:
       Unnecessary empty line.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -492,7 +492,8 @@ protected void onWorkerRegistered(WorkerType worker) {
 
     @Override
     public void disconnectTaskManager(final ResourceID resourceId, final 
Exception cause) {
-        closeTaskManagerConnection(resourceId, cause);
+        closeTaskManagerConnection(resourceId, 
cause).ifPresent(ResourceManager.this::stopWorker);

Review comment:
       After more offline discussion with @xintongsong, now I think it is 
reasonable to also `stopWorker` when `disconnectTaskManager`. Maybe we could 
have a test to guard this behavior.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
##########
@@ -422,13 +433,15 @@ public void testHeartbeatTimeoutWithTaskExecutor() throws 
Exception {
                             optionalHeartbeatRequestOrigin,
                             anyOf(is(resourceManagerResourceId), 
is(nullValue())));
                     assertThat(disconnectFuture.get(), 
instanceOf(TimeoutException.class));
+                    assertThat(stopWorkerFuture.get(), is(taskExecutorId));

Review comment:
       nit: the deprecated `org.junit.Assert.assertThat` could be replaced with 
`org.hamcrest.MatcherAssert.assertThat`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to