Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4462#discussion_r130929937
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
@@ -755,48 +751,42 @@ public void failSlot(final ResourceID taskManagerId,
if (registeredTaskManagers.containsKey(taskManagerId)) {
final RegistrationResponse response = new
JMTMRegistrationSuccess(
resourceId,
libraryCacheManager.getBlobServerPort());
- return FlinkCompletableFuture.completed(response);
+ return CompletableFuture.completedFuture(response);
} else {
- return getRpcService().execute(new
Callable<TaskExecutorGateway>() {
- @Override
- public TaskExecutorGateway call() throws
Exception {
- return
getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class)
-
.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
- }
- }).handleAsync(new BiFunction<TaskExecutorGateway,
Throwable, RegistrationResponse>() {
- @Override
- public RegistrationResponse apply(final
TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
- if (throwable != null) {
- return new
RegistrationResponse.Decline(throwable.getMessage());
- }
-
- if
(!JobMaster.this.leaderSessionID.equals(leaderId)) {
- log.warn("Discard registration
from TaskExecutor {} at ({}) because the expected " +
-
"leader session ID {} did not equal the received leader session ID {}.",
- taskManagerId,
taskManagerRpcAddress,
-
JobMaster.this.leaderSessionID, leaderId);
- return new
RegistrationResponse.Decline("Invalid leader session id");
- }
-
-
slotPoolGateway.registerTaskManager(taskManagerId);
-
registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation,
taskExecutorGateway));
-
- // monitor the task manager as
heartbeat target
-
taskManagerHeartbeatManager.monitorTarget(taskManagerId, new
HeartbeatTarget<Void>() {
- @Override
- public void
receiveHeartbeat(ResourceID resourceID, Void payload) {
- // the task manager
will not request heartbeat, so this method will never be called currently
+ return getRpcService()
+ .connect(taskManagerRpcAddress,
TaskExecutorGateway.class)
--- End diff --
Because we were blocking a thread from the `RpcService's` `Executor`
without a reason by calling `get` on the returned future by
`RpcService#connect`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---