[ 
https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15588696#comment-15588696
 ] 

ASF GitHub Bot commented on FLINK-4851:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2655#discussion_r84063789
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
    @@ -408,106 +528,98 @@ public void run() {
         */
        @Override
        public void handleError(final Exception exception) {
    -           log.error("ResourceManager received an error from the 
LeaderElectionService.", exception);
    -           // terminate ResourceManager in case of an error
    -           shutDown();
    +           onFatalErrorAsync(new ResourceManagerException("Received an 
error from the LeaderElectionService.", exception));
        }
     
        /**
    -    * Registers an infoMessage listener
    +    * This method should be called by the framework once it detects that a 
currently registered
    +    * task executor has failed.
         *
    -    * @param infoMessageListenerAddress address of infoMessage listener to 
register to this resource manager
    +    * @param resourceID Id of the worker that has failed.
    +    * @param message An informational message that explains why the worker 
failed.
         */
    -   @RpcMethod
    -   public void registerInfoMessageListener(final String 
infoMessageListenerAddress) {
    -           
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
    -                   log.warn("Receive a duplicate registration from info 
message listener on ({})", infoMessageListenerAddress);
    -           } else {
    -                   Future<InfoMessageListenerRpcGateway> 
infoMessageListenerRpcGatewayFuture = 
getRpcService().connect(infoMessageListenerAddress, 
InfoMessageListenerRpcGateway.class);
    -
    -                   infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new 
AcceptFunction<InfoMessageListenerRpcGateway>() {
    -                           @Override
    -                           public void 
accept(InfoMessageListenerRpcGateway gateway) {
    -                                   log.info("Receive a registration from 
info message listener on ({})", infoMessageListenerAddress);
    -                                   
infoMessageListeners.put(infoMessageListenerAddress, gateway);
    -                           }
    -                   }, getMainThreadExecutor());
    +   public void notifyWorkerFailed(final ResourceID resourceID, final 
String message) {
    +           runAsync(new Runnable() {
    --- End diff --
    
    I actually just adding logging. I guess that one has to add the 
registration id of the TM to filter out outdated notify worker failed calls.


> Add FatalErrorHandler and MetricRegistry to ResourceManager
> -----------------------------------------------------------
>
>                 Key: FLINK-4851
>                 URL: https://issues.apache.org/jira/browse/FLINK-4851
>             Project: Flink
>          Issue Type: Sub-task
>          Components: ResourceManager
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}. 
> In order to harmonize the fatal error handling across all components, we 
> should introduce a {{FatalErrorHandler}}, which handles fatal errors. 
> Additionally, we should also give a {{MetricRegistry}} to the 
> {{ResourceManager}} so that it can report metrics.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to