Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2655#discussion_r84063789
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
---
@@ -408,106 +528,98 @@ public void run() {
*/
@Override
public void handleError(final Exception exception) {
- log.error("ResourceManager received an error from the
LeaderElectionService.", exception);
- // terminate ResourceManager in case of an error
- shutDown();
+ onFatalErrorAsync(new ResourceManagerException("Received an
error from the LeaderElectionService.", exception));
}
/**
- * Registers an infoMessage listener
+ * This method should be called by the framework once it detects that a
currently registered
+ * task executor has failed.
*
- * @param infoMessageListenerAddress address of infoMessage listener to
register to this resource manager
+ * @param resourceID Id of the worker that has failed.
+ * @param message An informational message that explains why the worker
failed.
*/
- @RpcMethod
- public void registerInfoMessageListener(final String
infoMessageListenerAddress) {
-
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
- log.warn("Receive a duplicate registration from info
message listener on ({})", infoMessageListenerAddress);
- } else {
- Future<InfoMessageListenerRpcGateway>
infoMessageListenerRpcGatewayFuture =
getRpcService().connect(infoMessageListenerAddress,
InfoMessageListenerRpcGateway.class);
-
- infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new
AcceptFunction<InfoMessageListenerRpcGateway>() {
- @Override
- public void
accept(InfoMessageListenerRpcGateway gateway) {
- log.info("Receive a registration from
info message listener on ({})", infoMessageListenerAddress);
-
infoMessageListeners.put(infoMessageListenerAddress, gateway);
- }
- }, getMainThreadExecutor());
+ public void notifyWorkerFailed(final ResourceID resourceID, final
String message) {
+ runAsync(new Runnable() {
--- End diff --
I actually just adding logging. I guess that one has to add the
registration id of the TM to filter out outdated notify worker failed calls.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---