[
https://issues.apache.org/jira/browse/FLINK-4606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525719#comment-15525719
]
ASF GitHub Bot commented on FLINK-4606:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2540#discussion_r80663028
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
---
@@ -324,6 +337,158 @@ public void handleError(final Exception exception) {
shutDown();
}
+ /**
+ * Registers an infoMessage listener
+ *
+ * @param infoMessageListenerAddress address of infoMessage listener to
register to this resource manager
+ */
+ @RpcMethod
+ public void registerInfoMessageListener(final String
infoMessageListenerAddress) {
+
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
+ log.warn("Receive a duplicate registration from info
message listener on ({})", infoMessageListenerAddress);
+ } else {
+ Future<InfoMessageListenerRpcGateway>
infoMessageListenerRpcGatewayFuture =
getRpcService().connect(infoMessageListenerAddress,
InfoMessageListenerRpcGateway.class);
+
+ infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new
AcceptFunction<InfoMessageListenerRpcGateway>() {
+ @Override
+ public void
accept(InfoMessageListenerRpcGateway gateway) {
+ log.info("Receive a registration from
info message listener on ({})", infoMessageListenerAddress);
+
infoMessageListeners.put(infoMessageListenerAddress, gateway);
+ }
+ }, getMainThreadExecutor());
+
+
infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new
ApplyFunction<Throwable, Void>() {
+ @Override
+ public Void apply(Throwable failure) {
+ log.warn("Receive a registration from
unreachable info message listener on ({})", infoMessageListenerAddress);
+ return null;
+ }
+ }, getMainThreadExecutor());
+ }
+ }
+
+ /**
+ * Unregisters an infoMessage listener
+ *
+ * @param infoMessageListenerAddress address of infoMessage listener to
unregister from this resource manager
+ *
+ */
+ @RpcMethod
+ public void unRegisterInfoMessageListener(final String
infoMessageListenerAddress) {
+ infoMessageListeners.remove(infoMessageListenerAddress);
+ }
+
+ /**
+ * Shutdowns cluster
+ *
+ * @param finalStatus
+ * @param optionalDiagnostics
+ */
+ @RpcMethod
+ public void shutDownCluster(final ApplicationStatus finalStatus, final
String optionalDiagnostics) {
+ log.info("shut down cluster because application is in {},
diagnostics {}", finalStatus, optionalDiagnostics);
+ shutDownApplication(finalStatus, optionalDiagnostics);
+ }
+
+ /**
+ * This method should be called by the framework once it detects that a
currently registered task executor has failed.
+ *
+ * @param resourceID Id of the worker that has failed.
+ * @param message An informational message that explains why the worker
failed.
+ */
+ public void notifyWorkerFailed(final ResourceID resourceID, String
message) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ WorkerType worker =
taskExecutorGateways.remove(resourceID);
+ if (worker != null) {
+ // TODO :: suggest failed task executor
to stop itself
+
slotManager.notifyTaskManagerFailure(resourceID);
+ }
+ }
+ });
+ }
+
+ /**
+ * Gets the number of currently started TaskManagers.
+ *
+ * @return The number of currently started TaskManagers.
+ */
+ public int getNumberOfStartedTaskManagers() {
+ return taskExecutorGateways.size();
+ }
+
+ /**
+ * Notifies the resource manager of a fatal error.
+ *
+ * <p><b>IMPORTANT:</b> This should not cleanly shut down this master,
but exit it in
+ * such a way that a high-availability setting would restart this or
fail over
+ * to another master.
+ */
+ public void onFatalError(final String message, final Throwable error) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ fatalError(message, error);
+ }
+ });
+ }
+
+ //
------------------------------------------------------------------------
+ // Framework specific behavior
+ //
------------------------------------------------------------------------
+
+ /**
+ * Initializes the framework specific components.
+ *
+ * @throws Exception Exceptions during initialization cause the
resource manager to fail.
+ */
+ protected abstract void initialize() throws Exception;
+
+ /**
+ * Callback when a task executor register.
+ *
+ * @param resourceID The worker resource id
+ * @param taskExecutorGateway the task executor gateway
+ */
+ protected abstract WorkerType workerStarted(ResourceID resourceID,
TaskExecutorGateway taskExecutorGateway);
--- End diff --
This is missing all the other abstract methods of the old ResourceManager.
We will need `requestNewWorkers`, `releasePendingWorker`,
`releaseStartedWorker`, and `reacceptRegisteredWorkers`.
> Integrate the new ResourceManager with the existing FlinkResourceManager
> ------------------------------------------------------------------------
>
> Key: FLINK-4606
> URL: https://issues.apache.org/jira/browse/FLINK-4606
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: zhangjing
> Assignee: zhangjing
>
> Integrate the new ResourceManager with the existing FlinkResourceManager.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)