Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/6067#discussion_r190603934
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
---
@@ -933,25 +956,35 @@ public void requestHeartbeat(ResourceID resourceID,
SlotReport slotReport) {
blobCacheService.setBlobServerAddress(blobServerAddress);
+ establishedResourceManagerConnection = new
EstablishedResourceManagerConnection(
+ resourceManagerGateway,
+ resourceManagerResourceId,
+ taskExecutorRegistrationId);
+
stopRegistrationTimeout();
}
private void closeResourceManagerConnection(Exception cause) {
- if (resourceManagerConnection != null) {
-
- if (resourceManagerConnection.isConnected()) {
- if (log.isDebugEnabled()) {
- log.debug("Close ResourceManager
connection {}.",
-
resourceManagerConnection.getResourceManagerId(), cause);
- } else {
- log.info("Close ResourceManager
connection {}.",
-
resourceManagerConnection.getResourceManagerId());
- }
-
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerId());
+ if (establishedResourceManagerConnection != null) {
+ final ResourceID resourceManagerResourceId =
establishedResourceManagerConnection.getResourceManagerResourceId();
- ResourceManagerGateway resourceManagerGateway =
resourceManagerConnection.getTargetGateway();
-
resourceManagerGateway.disconnectTaskManager(getResourceID(), cause);
+ if (log.isDebugEnabled()) {
+ log.debug("Close ResourceManager connection
{}.",
--- End diff --
We already talked about it offline.
---