Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/6067#discussion_r190583266
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
---
@@ -933,25 +956,35 @@ public void requestHeartbeat(ResourceID resourceID,
SlotReport slotReport) {
blobCacheService.setBlobServerAddress(blobServerAddress);
+ establishedResourceManagerConnection = new
EstablishedResourceManagerConnection(
+ resourceManagerGateway,
+ resourceManagerResourceId,
+ taskExecutorRegistrationId);
+
stopRegistrationTimeout();
}
private void closeResourceManagerConnection(Exception cause) {
- if (resourceManagerConnection != null) {
-
- if (resourceManagerConnection.isConnected()) {
- if (log.isDebugEnabled()) {
- log.debug("Close ResourceManager
connection {}.",
-
resourceManagerConnection.getResourceManagerId(), cause);
- } else {
- log.info("Close ResourceManager
connection {}.",
-
resourceManagerConnection.getResourceManagerId());
- }
-
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
+ if (establishedResourceManagerConnection != null) {
+ final ResourceID resourceManagerResourceId =
establishedResourceManagerConnection.getResourceManagerResourceId();
- ResourceManagerGateway resourceManagerGateway =
resourceManagerConnection.getTargetGateway();
-
resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+ if (log.isDebugEnabled()) {
+ log.debug("Close ResourceManager connection
{}.",
--- End diff --
I was wondering whether `cause` can get logged twice:
1.
```
log.debug("Close ResourceManager connection {}.",
resourceManagerResourceId, cause);
```
2.
```
log.debug("Terminating registration attempts towards ResourceManager {}.",
resourceManagerConnection.getTargetAddress(), cause);
```
---