[ 
https://issues.apache.org/jira/browse/FLINK-4606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525719#comment-15525719
 ] 

ASF GitHub Bot commented on FLINK-4606:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2540#discussion_r80663028
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
    @@ -324,6 +337,158 @@ public void handleError(final Exception exception) {
                shutDown();
        }
     
    +   /**
    +    * Registers an infoMessage listener
    +    *
    +    * @param infoMessageListenerAddress address of infoMessage listener to 
register to this resource manager
    +    */
    +   @RpcMethod
    +   public void registerInfoMessageListener(final String 
infoMessageListenerAddress) {
    +           
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
    +                   log.warn("Receive a duplicate registration from info 
message listener on ({})", infoMessageListenerAddress);
    +           } else {
    +                   Future<InfoMessageListenerRpcGateway> 
infoMessageListenerRpcGatewayFuture = 
getRpcService().connect(infoMessageListenerAddress, 
InfoMessageListenerRpcGateway.class);
    +
    +                   infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new 
AcceptFunction<InfoMessageListenerRpcGateway>() {
    +                           @Override
    +                           public void 
accept(InfoMessageListenerRpcGateway gateway) {
    +                                   log.info("Receive a registration from 
info message listener on ({})", infoMessageListenerAddress);
    +                                   
infoMessageListeners.put(infoMessageListenerAddress, gateway);
    +                           }
    +                   }, getMainThreadExecutor());
    +
    +                   
infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
    +                           @Override
    +                           public Void apply(Throwable failure) {
    +                                   log.warn("Receive a registration from 
unreachable info message listener on ({})", infoMessageListenerAddress);
    +                                   return null;
    +                           }
    +                   }, getMainThreadExecutor());
    +           }
    +   }
    +
    +   /**
    +    * Unregisters an infoMessage listener
    +    *
    +    * @param infoMessageListenerAddress address of infoMessage listener to 
unregister from this resource manager
    +    *
    +    */
    +   @RpcMethod
    +   public void unRegisterInfoMessageListener(final String 
infoMessageListenerAddress) {
    +           infoMessageListeners.remove(infoMessageListenerAddress);
    +   }
    +
    +   /**
    +    * Shutdowns cluster
    +    *
    +    * @param finalStatus
    +    * @param optionalDiagnostics
    +    */
    +   @RpcMethod
    +   public void shutDownCluster(final ApplicationStatus finalStatus, final 
String optionalDiagnostics) {
    +           log.info("shut down cluster because application is in {}, 
diagnostics {}", finalStatus, optionalDiagnostics);
    +           shutDownApplication(finalStatus, optionalDiagnostics);
    +   }
    +
    +   /**
    +    * This method should be called by the framework once it detects that a 
currently registered task executor has failed.
    +    *
    +    * @param resourceID Id of the worker that has failed.
    +    * @param message An informational message that explains why the worker 
failed.
    +    */
    +   public void notifyWorkerFailed(final ResourceID resourceID, String 
message) {
    +           runAsync(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           WorkerType worker = 
taskExecutorGateways.remove(resourceID);
    +                           if (worker != null) {
    +                                   // TODO :: suggest failed task executor 
to stop itself
    +                                   
slotManager.notifyTaskManagerFailure(resourceID);
    +                           }
    +                   }
    +           });
    +   }
    +
    +   /**
    +    * Gets the number of currently started TaskManagers.
    +    *
    +    * @return The number of currently started TaskManagers.
    +    */
    +   public int getNumberOfStartedTaskManagers() {
    +           return taskExecutorGateways.size();
    +   }
    +
    +   /**
    +    * Notifies the resource manager of a fatal error.
    +    *
    +    * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, 
but exit it in
    +    * such a way that a high-availability setting would restart this or 
fail over
    +    * to another master.
    +    */
    +   public void onFatalError(final String message, final Throwable error) {
    +           runAsync(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           fatalError(message, error);
    +                   }
    +           });
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Framework specific behavior
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Initializes the framework specific components.
    +    *
    +    * @throws Exception Exceptions during initialization cause the 
resource manager to fail.
    +    */
    +   protected abstract void initialize() throws Exception;
    +
    +   /**
    +    * Callback when a task executor register.
    +    *
    +    * @param resourceID The worker resource id
    +    * @param taskExecutorGateway the task executor gateway
    +    */
    +   protected abstract WorkerType workerStarted(ResourceID resourceID, 
TaskExecutorGateway taskExecutorGateway);
    --- End diff --
    
    This is missing all the other abstract methods of the old ResourceManager. 
We will need `requestNewWorkers`, `releasePendingWorker`, 
`releaseStartedWorker`, and `reacceptRegisteredWorkers`.


> Integrate the new ResourceManager with the existing FlinkResourceManager
> ------------------------------------------------------------------------
>
>                 Key: FLINK-4606
>                 URL: https://issues.apache.org/jira/browse/FLINK-4606
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: zhangjing
>            Assignee: zhangjing
>
> Integrate the new ResourceManager with the existing FlinkResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to