xintongsong commented on a change in pull request #15524:
URL: https://github.com/apache/flink/pull/15524#discussion_r636600545



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
##########
@@ -18,63 +18,101 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.FlinkException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Default implementation of {@link ResourceManagerService}. */
-public class ResourceManagerServiceImpl implements ResourceManagerService {
+public class ResourceManagerServiceImpl implements ResourceManagerService, 
LeaderContender {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResourceManagerServiceImpl.class);
 
     private final ResourceManagerFactory<?> resourceManagerFactory;
     private final ResourceManagerProcessContext rmProcessContext;
 
+    private final LeaderElectionService leaderElectionService;
+    private final FatalErrorHandler fatalErrorHandler;
+    private final Executor ioExecutor;
+
+    private final Executor handleLeaderEventExecutor;
     private final CompletableFuture<Void> terminationFuture;
 
     private final Object lock = new Object();
 
+    @GuardedBy("lock")
+    private boolean running;
+
     @Nullable
     @GuardedBy("lock")
     private ResourceManager<?> resourceManager;
 
+    @Nullable
+    @GuardedBy("lock")
+    private UUID leaderSessionID;
+
     private ResourceManagerServiceImpl(
             ResourceManagerFactory<?> resourceManagerFactory,
             ResourceManagerProcessContext rmProcessContext) {
         this.resourceManagerFactory = checkNotNull(resourceManagerFactory);
         this.rmProcessContext = checkNotNull(rmProcessContext);
 
+        this.leaderElectionService =
+                rmProcessContext
+                        .getHighAvailabilityServices()
+                        .getResourceManagerLeaderElectionService();
+        this.fatalErrorHandler = rmProcessContext.getFatalErrorHandler();
+        this.ioExecutor = rmProcessContext.getIoExecutor();
+
+        this.handleLeaderEventExecutor = Executors.newSingleThreadExecutor();

Review comment:
       True, it should be properly shutdown.
   
   Yes. `grantLeadership` and `revokeLeadership` are invoked from the leader 
election service thread. Starting/stopping RMs may include external 
interactions. I think it would be nice to not block the leader election service 
with such operations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to