Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/6067#discussion_r190557380
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
---
@@ -732,19 +744,10 @@ public void heartbeatFromResourceManager(ResourceID
resourceID) {
allocationId, jobId, resourceManagerId);
try {
- if (resourceManagerConnection == null) {
- final String message = "TaskManager is not
connected to a resource manager.";
+ if (!isConnectedToResourceManager(resourceManagerId)) {
+ final String message =
String.format("TaskManager is not connected to the resource manager %s.",
resourceManagerId);
log.debug(message);
- throw new SlotAllocationException(message);
- }
-
- if
(!Objects.equals(resourceManagerConnection.getTargetLeaderId(),
resourceManagerId)) {
- final String message = "The leader id " +
resourceManagerId +
- " does not match with the leader id of
the connected resource manager " +
-
resourceManagerConnection.getTargetLeaderId() + '.';
-
- log.debug(message);
- throw new SlotAllocationException(message);
+ throw new TaskManagerException(message);
--- End diff --
Why not return an exceptional future here?
---