[
https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16111251#comment-16111251
]
ASF GitHub Bot commented on FLINK-7334:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4462#discussion_r130929937
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
@@ -755,48 +751,42 @@ public void failSlot(final ResourceID taskManagerId,
if (registeredTaskManagers.containsKey(taskManagerId)) {
final RegistrationResponse response = new
JMTMRegistrationSuccess(
resourceId,
libraryCacheManager.getBlobServerPort());
- return FlinkCompletableFuture.completed(response);
+ return CompletableFuture.completedFuture(response);
} else {
- return getRpcService().execute(new
Callable<TaskExecutorGateway>() {
- @Override
- public TaskExecutorGateway call() throws
Exception {
- return
getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class)
-
.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
- }
- }).handleAsync(new BiFunction<TaskExecutorGateway,
Throwable, RegistrationResponse>() {
- @Override
- public RegistrationResponse apply(final
TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
- if (throwable != null) {
- return new
RegistrationResponse.Decline(throwable.getMessage());
- }
-
- if
(!JobMaster.this.leaderSessionID.equals(leaderId)) {
- log.warn("Discard registration
from TaskExecutor {} at ({}) because the expected " +
-
"leader session ID {} did not equal the received leader session ID {}.",
- taskManagerId,
taskManagerRpcAddress,
-
JobMaster.this.leaderSessionID, leaderId);
- return new
RegistrationResponse.Decline("Invalid leader session id");
- }
-
-
slotPoolGateway.registerTaskManager(taskManagerId);
-
registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation,
taskExecutorGateway));
-
- // monitor the task manager as
heartbeat target
-
taskManagerHeartbeatManager.monitorTarget(taskManagerId, new
HeartbeatTarget<Void>() {
- @Override
- public void
receiveHeartbeat(ResourceID resourceID, Void payload) {
- // the task manager
will not request heartbeat, so this method will never be called currently
+ return getRpcService()
+ .connect(taskManagerRpcAddress,
TaskExecutorGateway.class)
--- End diff --
Because we were blocking a thread from the `RpcService's` `Executor`
without a reason by calling `get` on the returned future by
`RpcService#connect`.
> Replace Flink's futures by CompletableFuture in RpcGateway
> ----------------------------------------------------------
>
> Key: FLINK-7334
> URL: https://issues.apache.org/jira/browse/FLINK-7334
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Fix For: 1.4.0
>
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)