Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2655#discussion_r84063789
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
    @@ -408,106 +528,98 @@ public void run() {
         */
        @Override
        public void handleError(final Exception exception) {
    -           log.error("ResourceManager received an error from the 
LeaderElectionService.", exception);
    -           // terminate ResourceManager in case of an error
    -           shutDown();
    +           onFatalErrorAsync(new ResourceManagerException("Received an 
error from the LeaderElectionService.", exception));
        }
     
        /**
    -    * Registers an infoMessage listener
    +    * This method should be called by the framework once it detects that a 
currently registered
    +    * task executor has failed.
         *
    -    * @param infoMessageListenerAddress address of infoMessage listener to 
register to this resource manager
    +    * @param resourceID Id of the worker that has failed.
    +    * @param message An informational message that explains why the worker 
failed.
         */
    -   @RpcMethod
    -   public void registerInfoMessageListener(final String 
infoMessageListenerAddress) {
    -           
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
    -                   log.warn("Receive a duplicate registration from info 
message listener on ({})", infoMessageListenerAddress);
    -           } else {
    -                   Future<InfoMessageListenerRpcGateway> 
infoMessageListenerRpcGatewayFuture = 
getRpcService().connect(infoMessageListenerAddress, 
InfoMessageListenerRpcGateway.class);
    -
    -                   infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new 
AcceptFunction<InfoMessageListenerRpcGateway>() {
    -                           @Override
    -                           public void 
accept(InfoMessageListenerRpcGateway gateway) {
    -                                   log.info("Receive a registration from 
info message listener on ({})", infoMessageListenerAddress);
    -                                   
infoMessageListeners.put(infoMessageListenerAddress, gateway);
    -                           }
    -                   }, getMainThreadExecutor());
    +   public void notifyWorkerFailed(final ResourceID resourceID, final 
String message) {
    +           runAsync(new Runnable() {
    --- End diff --
    
    I actually just adding logging. I guess that one has to add the 
registration id of the TM to filter out outdated notify worker failed calls.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to