Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2540#discussion_r80663028
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ---
    @@ -324,6 +337,158 @@ public void handleError(final Exception exception) {
                shutDown();
        }
     
    +   /**
    +    * Registers an infoMessage listener
    +    *
    +    * @param infoMessageListenerAddress address of infoMessage listener to 
register to this resource manager
    +    */
    +   @RpcMethod
    +   public void registerInfoMessageListener(final String 
infoMessageListenerAddress) {
    +           
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
    +                   log.warn("Receive a duplicate registration from info 
message listener on ({})", infoMessageListenerAddress);
    +           } else {
    +                   Future<InfoMessageListenerRpcGateway> 
infoMessageListenerRpcGatewayFuture = 
getRpcService().connect(infoMessageListenerAddress, 
InfoMessageListenerRpcGateway.class);
    +
    +                   infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new 
AcceptFunction<InfoMessageListenerRpcGateway>() {
    +                           @Override
    +                           public void 
accept(InfoMessageListenerRpcGateway gateway) {
    +                                   log.info("Receive a registration from 
info message listener on ({})", infoMessageListenerAddress);
    +                                   
infoMessageListeners.put(infoMessageListenerAddress, gateway);
    +                           }
    +                   }, getMainThreadExecutor());
    +
    +                   
infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
    +                           @Override
    +                           public Void apply(Throwable failure) {
    +                                   log.warn("Receive a registration from 
unreachable info message listener on ({})", infoMessageListenerAddress);
    +                                   return null;
    +                           }
    +                   }, getMainThreadExecutor());
    +           }
    +   }
    +
    +   /**
    +    * Unregisters an infoMessage listener
    +    *
    +    * @param infoMessageListenerAddress address of infoMessage listener to 
unregister from this resource manager
    +    *
    +    */
    +   @RpcMethod
    +   public void unRegisterInfoMessageListener(final String 
infoMessageListenerAddress) {
    +           infoMessageListeners.remove(infoMessageListenerAddress);
    +   }
    +
    +   /**
    +    * Shutdowns cluster
    +    *
    +    * @param finalStatus
    +    * @param optionalDiagnostics
    +    */
    +   @RpcMethod
    +   public void shutDownCluster(final ApplicationStatus finalStatus, final 
String optionalDiagnostics) {
    +           log.info("shut down cluster because application is in {}, 
diagnostics {}", finalStatus, optionalDiagnostics);
    +           shutDownApplication(finalStatus, optionalDiagnostics);
    +   }
    +
    +   /**
    +    * This method should be called by the framework once it detects that a 
currently registered task executor has failed.
    +    *
    +    * @param resourceID Id of the worker that has failed.
    +    * @param message An informational message that explains why the worker 
failed.
    +    */
    +   public void notifyWorkerFailed(final ResourceID resourceID, String 
message) {
    +           runAsync(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           WorkerType worker = 
taskExecutorGateways.remove(resourceID);
    +                           if (worker != null) {
    +                                   // TODO :: suggest failed task executor 
to stop itself
    +                                   
slotManager.notifyTaskManagerFailure(resourceID);
    +                           }
    +                   }
    +           });
    +   }
    +
    +   /**
    +    * Gets the number of currently started TaskManagers.
    +    *
    +    * @return The number of currently started TaskManagers.
    +    */
    +   public int getNumberOfStartedTaskManagers() {
    +           return taskExecutorGateways.size();
    +   }
    +
    +   /**
    +    * Notifies the resource manager of a fatal error.
    +    *
    +    * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, 
but exit it in
    +    * such a way that a high-availability setting would restart this or 
fail over
    +    * to another master.
    +    */
    +   public void onFatalError(final String message, final Throwable error) {
    +           runAsync(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                           fatalError(message, error);
    +                   }
    +           });
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Framework specific behavior
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Initializes the framework specific components.
    +    *
    +    * @throws Exception Exceptions during initialization cause the 
resource manager to fail.
    +    */
    +   protected abstract void initialize() throws Exception;
    +
    +   /**
    +    * Callback when a task executor register.
    +    *
    +    * @param resourceID The worker resource id
    +    * @param taskExecutorGateway the task executor gateway
    +    */
    +   protected abstract WorkerType workerStarted(ResourceID resourceID, 
TaskExecutorGateway taskExecutorGateway);
    --- End diff --
    
    This is missing all the other abstract methods of the old ResourceManager. 
We will need `requestNewWorkers`, `releasePendingWorker`, 
`releaseStartedWorker`, and `reacceptRegisteredWorkers`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to