This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a6b64a  [FLINK-23240][runtime] Master process supports living through 
multiple leader sessions.
7a6b64a is described below

commit 7a6b64adc438ff26a1cdc809df126ab3d30c7c3d
Author: Xintong Song <[email protected]>
AuthorDate: Wed Feb 16 11:56:34 2022 +0800

    [FLINK-23240][runtime] Master process supports living through multiple 
leader sessions.
    
    This is supported on all non-Yarn deployments. The Yarn deployment is not 
supported because each AM process can only register at Yarn RM for once.
    
    close #18793
---
 .../resourcemanager/ResourceManagerFactory.java       |  5 +++++
 .../resourcemanager/ResourceManagerServiceImpl.java   | 10 +---------
 .../LeaderChangeClusterComponentsTest.java            |  8 --------
 .../ResourceManagerServiceImplTest.java               | 12 +++---------
 .../TestingResourceManagerFactory.java                | 19 +++++++++++++++++--
 .../yarn/entrypoint/YarnResourceManagerFactory.java   |  8 ++++++++
 6 files changed, 34 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
index 6bb6b7b..893ad9b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -115,6 +115,11 @@ public abstract class ResourceManagerFactory<T extends 
ResourceIDRetrievable> {
                 context.getIoExecutor());
     }
 
+    /** This indicates whether the process should be terminated after losing 
leadership. */
+    protected boolean supportMultiLeaderSession() {
+        return true;
+    }
+
     /**
      * Configuration changes in this method will be visible to both {@link 
ResourceManager} and
      * {@link ResourceManagerRuntimeServices}. This can be overwritten by 
{@link
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
index c0c038f..e0add8e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
@@ -51,9 +51,6 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /** Default implementation of {@link ResourceManagerService}. */
 public class ResourceManagerServiceImpl implements ResourceManagerService, 
LeaderContender {
 
-    public static final String ENABLE_MULTI_LEADER_SESSION_PROPERTY =
-            "flink.tests.enable-rm-multi-leader-session";
-
     private static final Logger LOG = 
LoggerFactory.getLogger(ResourceManagerServiceImpl.class);
 
     private final ResourceManagerFactory<?> resourceManagerFactory;
@@ -68,8 +65,6 @@ public class ResourceManagerServiceImpl implements 
ResourceManagerService, Leade
 
     private final Object lock = new Object();
 
-    private final boolean enableMultiLeaderSession;
-
     @GuardedBy("lock")
     private boolean running;
 
@@ -100,9 +95,6 @@ public class ResourceManagerServiceImpl implements 
ResourceManagerService, Leade
         this.handleLeaderEventExecutor = Executors.newSingleThreadExecutor();
         this.serviceTerminationFuture = new CompletableFuture<>();
 
-        this.enableMultiLeaderSession =
-                
System.getProperties().containsKey(ENABLE_MULTI_LEADER_SESSION_PROPERTY);
-
         this.running = false;
         this.leaderResourceManager = null;
         this.leaderSessionID = null;
@@ -217,7 +209,7 @@ public class ResourceManagerServiceImpl implements 
ResourceManagerService, Leade
 
                         stopLeaderResourceManager();
 
-                        if (!enableMultiLeaderSession) {
+                        if 
(!resourceManagerFactory.supportMultiLeaderSession()) {
                             closeAsync();
                         }
                     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
index 62b9fd5..e0fcb72 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.jobmaster.utils.JobResultUtils;
 import org.apache.flink.runtime.minicluster.TestingMiniCluster;
 import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.testutils.TestingUtils;
@@ -44,7 +43,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.time.Duration;
-import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.Matchers.is;
@@ -65,16 +63,12 @@ public class LeaderChangeClusterComponentsTest extends 
TestLogger {
 
     private static EmbeddedHaServicesWithLeadershipControl 
highAvailabilityServices;
 
-    private static Properties sysProps;
-
     private JobGraph jobGraph;
 
     private JobID jobId;
 
     @BeforeClass
     public static void setupClass() throws Exception {
-        sysProps = System.getProperties();
-        
System.setProperty(ResourceManagerServiceImpl.ENABLE_MULTI_LEADER_SESSION_PROPERTY,
 "");
 
         highAvailabilityServices =
                 new 
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
@@ -102,8 +96,6 @@ public class LeaderChangeClusterComponentsTest extends 
TestLogger {
         if (miniCluster != null) {
             miniCluster.close();
         }
-
-        System.setProperties(sysProps);
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
index c08deb1..3242fc7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
@@ -39,7 +39,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ForkJoinPool;
@@ -70,8 +69,6 @@ public class ResourceManagerServiceImplTest extends 
TestLogger {
     private TestingLeaderElectionService leaderElectionService;
     private ResourceManagerServiceImpl resourceManagerService;
 
-    private Properties sysProps;
-
     @BeforeClass
     public static void setupClass() {
         rpcService = new TestingRpcService();
@@ -81,8 +78,6 @@ public class ResourceManagerServiceImplTest extends 
TestLogger {
 
     @Before
     public void setup() throws Exception {
-        sysProps = System.getProperties();
-        
System.setProperty(ResourceManagerServiceImpl.ENABLE_MULTI_LEADER_SESSION_PROPERTY,
 "");
 
         fatalErrorHandler.clearError();
 
@@ -105,8 +100,6 @@ public class ResourceManagerServiceImplTest extends 
TestLogger {
         if (fatalErrorHandler.hasExceptionOccurred()) {
             fatalErrorHandler.rethrowError();
         }
-
-        System.setProperties(sysProps);
     }
 
     @AfterClass
@@ -335,8 +328,9 @@ public class ResourceManagerServiceImplTest extends 
TestLogger {
     }
 
     @Test
-    public void revokeLeadership_terminateService_multiLeaderSessionDisabled() 
throws Exception {
-        
System.clearProperty(ResourceManagerServiceImpl.ENABLE_MULTI_LEADER_SESSION_PROPERTY);
+    public void 
revokeLeadership_terminateService_multiLeaderSessionNotSupported()
+            throws Exception {
+        rmFactoryBuilder.setSupportMultiLeaderSession(false);
 
         createAndStartResourceManager();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
index be76ee6..5794748 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
@@ -51,17 +51,20 @@ public class TestingResourceManagerFactory extends 
ResourceManagerFactory<Resour
             internalDeregisterApplicationConsumer;
     private final BiFunction<ResourceManager<?>, CompletableFuture<Void>, 
CompletableFuture<Void>>
             getTerminationFutureFunction;
+    private final boolean supportMultiLeaderSession;
 
     public TestingResourceManagerFactory(
             Consumer<UUID> initializeConsumer,
             Consumer<UUID> terminateConsumer,
             TriConsumer<UUID, ApplicationStatus, String> 
internalDeregisterApplicationConsumer,
             BiFunction<ResourceManager<?>, CompletableFuture<Void>, 
CompletableFuture<Void>>
-                    getTerminationFutureFunction) {
+                    getTerminationFutureFunction,
+            boolean supportMultiLeaderSession) {
         this.initializeConsumer = initializeConsumer;
         this.terminateConsumer = terminateConsumer;
         this.internalDeregisterApplicationConsumer = 
internalDeregisterApplicationConsumer;
         this.getTerminationFutureFunction = getTerminationFutureFunction;
+        this.supportMultiLeaderSession = supportMultiLeaderSession;
     }
 
     @Override
@@ -101,6 +104,11 @@ public class TestingResourceManagerFactory extends 
ResourceManagerFactory<Resour
                 
.createResourceManagerRuntimeServicesConfiguration(configuration);
     }
 
+    @Override
+    public boolean supportMultiLeaderSession() {
+        return supportMultiLeaderSession;
+    }
+
     public static class Builder {
         private Consumer<UUID> initializeConsumer = (ignore) -> {};
         private Consumer<UUID> terminateConsumer = (ignore) -> {};
@@ -109,6 +117,7 @@ public class TestingResourceManagerFactory extends 
ResourceManagerFactory<Resour
         private BiFunction<ResourceManager<?>, CompletableFuture<Void>, 
CompletableFuture<Void>>
                 getTerminationFutureFunction =
                         (rm, superTerminationFuture) -> superTerminationFuture;
+        private boolean supportMultiLeaderSession = true;
 
         public Builder setInitializeConsumer(Consumer<UUID> 
initializeConsumer) {
             this.initializeConsumer = initializeConsumer;
@@ -134,12 +143,18 @@ public class TestingResourceManagerFactory extends 
ResourceManagerFactory<Resour
             return this;
         }
 
+        public Builder setSupportMultiLeaderSession(boolean 
supportMultiLeaderSession) {
+            this.supportMultiLeaderSession = supportMultiLeaderSession;
+            return this;
+        }
+
         public TestingResourceManagerFactory build() {
             return new TestingResourceManagerFactory(
                     initializeConsumer,
                     terminateConsumer,
                     internalDeregisterApplicationConsumer,
-                    getTerminationFutureFunction);
+                    getTerminationFutureFunction,
+                    supportMultiLeaderSession);
         }
     }
 
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
index b1ef748..850cfc2 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
@@ -65,4 +65,12 @@ public class YarnResourceManagerFactory extends 
ActiveResourceManagerFactory<Yar
         return ResourceManagerRuntimeServicesConfiguration.fromConfiguration(
                 configuration, YarnWorkerResourceSpecFactory.INSTANCE);
     }
+
+    @Override
+    public boolean supportMultiLeaderSession() {
+        // Multiple leader session is not supported by the Yarn deployment, 
because Flink RM relies
+        // on the registration response from Yarn RM for recovering previous 
resources, but Yarn
+        // only allows each AM process to register for once.
+        return false;
+    }
 }

Reply via email to