[
https://issues.apache.org/jira/browse/FLINK-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15449232#comment-15449232
]
ASF GitHub Bot commented on FLINK-4516:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2427#discussion_r76811799
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
---
@@ -116,4 +142,78 @@ public SlotAssignment requestSlot(SlotRequest
slotRequest) {
return new TaskExecutorRegistrationSuccess(new InstanceID(),
5000);
}
+
+ /**
+ * Callback method when current resourceManager is granted leadership
+ *
+ * @param newLeaderSessionID unique leadershipID
+ */
+ void handleGrantLeadership(final UUID newLeaderSessionID) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("ResourceManager {} was granted
leadership with leader session ID {}", getAddress(), newLeaderSessionID);
+ leaderSessionID = newLeaderSessionID;
+ // confirming the leader session ID might be
blocking,
+
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+ }
+ });
+ }
+
+ /**
+ * Callback method when current resourceManager lose leadership.
+ */
+ void handleRevokeLeadership() {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.info("ResourceManager {} was revoked
leadership.", getAddress());
+ jobMasterGateways.clear();
+ leaderSessionID = null;
+ }
+ });
+ }
+
+ /**
+ * Callback method when an error happened to current resourceManager on
leader election
+ * @param e
+ */
+ void onLeaderElectionError(final Throwable e) {
+ runAsync(new Runnable() {
+ @Override
+ public void run() {
+ log.error("ResourceManager received an error
from the LeaderElectionService.", e);
+ // terminate ResourceManager in case of an error
+ shutDown();
+ }
+ });
+ }
+
+ private class ResourceManagerLeaderContender implements LeaderContender
{
+
+ @Override
+ public void grantLeadership(UUID leaderSessionID) {
+ handleGrantLeadership(leaderSessionID);
--- End diff --
I wonder why do you call methods in the main class? Couldn't all code in
the methods `handleGrantLeadership`, `revokeLeadership`,
`onLeaderElectionError` not simply be implemented in this class? Of course they
should still call `runAsync` to execute in the main RPC thread.
> ResourceManager leadership election
> -----------------------------------
>
> Key: FLINK-4516
> URL: https://issues.apache.org/jira/browse/FLINK-4516
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: zhangjing
> Assignee: zhangjing
>
> 1. When a resourceManager is started, it starts the leadership election
> service first and take part in contending for leadership
> 2. Every resourceManager contains a ResourceManagerLeaderContender, when it
> is granted leadership, it will start SlotManager and other main components.
> when it is revoked leadership, it will stop all its components and clear
> everything.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)