[ 
https://issues.apache.org/jira/browse/FLINK-7334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16111251#comment-16111251
 ] 

ASF GitHub Bot commented on FLINK-7334:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4462#discussion_r130929937
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
    @@ -755,48 +751,42 @@ public void failSlot(final ResourceID taskManagerId,
                if (registeredTaskManagers.containsKey(taskManagerId)) {
                        final RegistrationResponse response = new 
JMTMRegistrationSuccess(
                                resourceId, 
libraryCacheManager.getBlobServerPort());
    -                   return FlinkCompletableFuture.completed(response);
    +                   return CompletableFuture.completedFuture(response);
                } else {
    -                   return getRpcService().execute(new 
Callable<TaskExecutorGateway>() {
    -                           @Override
    -                           public TaskExecutorGateway call() throws 
Exception {
    -                                   return 
getRpcService().connect(taskManagerRpcAddress, TaskExecutorGateway.class)
    -                                                   
.get(rpcTimeout.getSize(), rpcTimeout.getUnit());
    -                           }
    -                   }).handleAsync(new BiFunction<TaskExecutorGateway, 
Throwable, RegistrationResponse>() {
    -                           @Override
    -                           public RegistrationResponse apply(final 
TaskExecutorGateway taskExecutorGateway, Throwable throwable) {
    -                                   if (throwable != null) {
    -                                           return new 
RegistrationResponse.Decline(throwable.getMessage());
    -                                   }
    -
    -                                   if 
(!JobMaster.this.leaderSessionID.equals(leaderId)) {
    -                                           log.warn("Discard registration 
from TaskExecutor {} at ({}) because the expected " +
    -                                                                           
"leader session ID {} did not equal the received leader session ID {}.",
    -                                                           taskManagerId, 
taskManagerRpcAddress,
    -                                                           
JobMaster.this.leaderSessionID, leaderId);
    -                                           return new 
RegistrationResponse.Decline("Invalid leader session id");
    -                                   }
    -
    -                                   
slotPoolGateway.registerTaskManager(taskManagerId);
    -                                   
registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, 
taskExecutorGateway));
    -
    -                                   // monitor the task manager as 
heartbeat target
    -                                   
taskManagerHeartbeatManager.monitorTarget(taskManagerId, new 
HeartbeatTarget<Void>() {
    -                                           @Override
    -                                           public void 
receiveHeartbeat(ResourceID resourceID, Void payload) {
    -                                                   // the task manager 
will not request heartbeat, so this method will never be called currently
    +                   return getRpcService()
    +                           .connect(taskManagerRpcAddress, 
TaskExecutorGateway.class)
    --- End diff --
    
    Because we were blocking a thread from the `RpcService's` `Executor` 
without a reason by calling `get` on the returned future by 
`RpcService#connect`. 


> Replace Flink's futures by CompletableFuture in RpcGateway
> ----------------------------------------------------------
>
>                 Key: FLINK-7334
>                 URL: https://issues.apache.org/jira/browse/FLINK-7334
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination
>    Affects Versions: 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to