wangyang0918 commented on a change in pull request #17362:
URL: https://github.com/apache/flink/pull/17362#discussion_r716473958
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -492,7 +492,8 @@ protected void onWorkerRegistered(WorkerType worker) {
@Override
public void disconnectTaskManager(final ResourceID resourceId, final
Exception cause) {
- closeTaskManagerConnection(resourceId, cause);
+ closeTaskManagerConnection(resourceId,
cause).ifPresent(ResourceManager.this::stopWorker);
Review comment:
I am afraid we probably not need to stop worker here. If the TaskManager
want to reconnect to the ResourceManager, it will call
`closeResourceManagerConnection` first, as well as `disconnectTaskManager`. In
such case, the TaskManager will be released.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -492,7 +492,8 @@ protected void onWorkerRegistered(WorkerType worker) {
@Override
public void disconnectTaskManager(final ResourceID resourceId, final
Exception cause) {
- closeTaskManagerConnection(resourceId, cause);
+ closeTaskManagerConnection(resourceId,
cause).ifPresent(ResourceManager.this::stopWorker);
+ ;
Review comment:
Unnecessary empty line.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
##########
@@ -492,7 +492,8 @@ protected void onWorkerRegistered(WorkerType worker) {
@Override
public void disconnectTaskManager(final ResourceID resourceId, final
Exception cause) {
- closeTaskManagerConnection(resourceId, cause);
+ closeTaskManagerConnection(resourceId,
cause).ifPresent(ResourceManager.this::stopWorker);
Review comment:
After more offline discussion with @xintongsong, now I think it is
reasonable to also `stopWorker` when `disconnectTaskManager`. Maybe we could
have a test to guard this behavior.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
##########
@@ -422,13 +433,15 @@ public void testHeartbeatTimeoutWithTaskExecutor() throws
Exception {
optionalHeartbeatRequestOrigin,
anyOf(is(resourceManagerResourceId),
is(nullValue())));
assertThat(disconnectFuture.get(),
instanceOf(TimeoutException.class));
+ assertThat(stopWorkerFuture.get(), is(taskExecutorId));
Review comment:
nit: the deprecated `org.junit.Assert.assertThat` could be replaced with
`org.hamcrest.MatcherAssert.assertThat`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]