This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 2465c850 [FLINK-23240][runtime] Master process supports living through
multiple leader sessions.
2465c850 is described below
commit 2465c850f48497cdb19edd10240ce3e8c8fe71de
Author: Xintong Song <[email protected]>
AuthorDate: Wed Feb 16 11:56:34 2022 +0800
[FLINK-23240][runtime] Master process supports living through multiple
leader sessions.
This is supported on all non-Yarn deployments. The Yarn deployment is not
supported because each AM process can only register at Yarn RM for once.
close #18839
---
.../resourcemanager/ResourceManagerFactory.java | 5 +++++
.../resourcemanager/ResourceManagerServiceImpl.java | 10 +---------
.../LeaderChangeClusterComponentsTest.java | 8 --------
.../ResourceManagerServiceImplTest.java | 12 +++---------
.../TestingResourceManagerFactory.java | 19 +++++++++++++++++--
.../yarn/entrypoint/YarnResourceManagerFactory.java | 8 ++++++++
6 files changed, 34 insertions(+), 28 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
index fd8a9f6..fab804b 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerFactory.java
@@ -114,6 +114,11 @@ public abstract class ResourceManagerFactory<T extends
ResourceIDRetrievable> {
context.getIoExecutor());
}
+ /** This indicates whether the process should be terminated after losing
leadership. */
+ protected boolean supportMultiLeaderSession() {
+ return true;
+ }
+
/**
* Configuration changes in this method will be visible to both {@link
ResourceManager} and
* {@link ResourceManagerRuntimeServices}. This can be overwritten by
{@link
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
index 2486377..8271528 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
@@ -51,9 +51,6 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
/** Default implementation of {@link ResourceManagerService}. */
public class ResourceManagerServiceImpl implements ResourceManagerService,
LeaderContender {
- public static final String ENABLE_MULTI_LEADER_SESSION_PROPERTY =
- "flink.tests.enable-rm-multi-leader-session";
-
private static final Logger LOG =
LoggerFactory.getLogger(ResourceManagerServiceImpl.class);
private final ResourceManagerFactory<?> resourceManagerFactory;
@@ -68,8 +65,6 @@ public class ResourceManagerServiceImpl implements
ResourceManagerService, Leade
private final Object lock = new Object();
- private final boolean enableMultiLeaderSession;
-
@GuardedBy("lock")
private boolean running;
@@ -100,9 +95,6 @@ public class ResourceManagerServiceImpl implements
ResourceManagerService, Leade
this.handleLeaderEventExecutor = Executors.newSingleThreadExecutor();
this.serviceTerminationFuture = new CompletableFuture<>();
- this.enableMultiLeaderSession =
-
System.getProperties().containsKey(ENABLE_MULTI_LEADER_SESSION_PROPERTY);
-
this.running = false;
this.leaderResourceManager = null;
this.leaderSessionID = null;
@@ -217,7 +209,7 @@ public class ResourceManagerServiceImpl implements
ResourceManagerService, Leade
stopLeaderResourceManager();
- if (!enableMultiLeaderSession) {
+ if
(!resourceManagerFactory.supportMultiLeaderSession()) {
closeAsync();
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
index 09f09ce..1b1250d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.utils.JobResultUtils;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -44,7 +43,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.time.Duration;
-import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import static org.hamcrest.Matchers.is;
@@ -65,16 +63,12 @@ public class LeaderChangeClusterComponentsTest extends
TestLogger {
private static EmbeddedHaServicesWithLeadershipControl
highAvailabilityServices;
- private static Properties sysProps;
-
private JobGraph jobGraph;
private JobID jobId;
@BeforeClass
public static void setupClass() throws Exception {
- sysProps = System.getProperties();
-
System.setProperty(ResourceManagerServiceImpl.ENABLE_MULTI_LEADER_SESSION_PROPERTY,
"");
highAvailabilityServices =
new
EmbeddedHaServicesWithLeadershipControl(TestingUtils.defaultExecutor());
@@ -102,8 +96,6 @@ public class LeaderChangeClusterComponentsTest extends
TestLogger {
if (miniCluster != null) {
miniCluster.close();
}
-
- System.setProperties(sysProps);
}
@Test
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
index 69100a6..f94c51a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
@@ -38,7 +38,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
@@ -69,8 +68,6 @@ public class ResourceManagerServiceImplTest extends
TestLogger {
private TestingLeaderElectionService leaderElectionService;
private ResourceManagerServiceImpl resourceManagerService;
- private Properties sysProps;
-
@BeforeClass
public static void setupClass() {
rpcService = new TestingRpcService();
@@ -80,8 +77,6 @@ public class ResourceManagerServiceImplTest extends
TestLogger {
@Before
public void setup() throws Exception {
- sysProps = System.getProperties();
-
System.setProperty(ResourceManagerServiceImpl.ENABLE_MULTI_LEADER_SESSION_PROPERTY,
"");
fatalErrorHandler.clearError();
@@ -104,8 +99,6 @@ public class ResourceManagerServiceImplTest extends
TestLogger {
if (fatalErrorHandler.hasExceptionOccurred()) {
fatalErrorHandler.rethrowError();
}
-
- System.setProperties(sysProps);
}
@AfterClass
@@ -333,8 +326,9 @@ public class ResourceManagerServiceImplTest extends
TestLogger {
}
@Test
- public void revokeLeadership_terminateService_multiLeaderSessionDisabled()
throws Exception {
-
System.clearProperty(ResourceManagerServiceImpl.ENABLE_MULTI_LEADER_SESSION_PROPERTY);
+ public void
revokeLeadership_terminateService_multiLeaderSessionNotSupported()
+ throws Exception {
+ rmFactoryBuilder.setSupportMultiLeaderSession(false);
createAndStartResourceManager();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
index be76ee6..5794748 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManagerFactory.java
@@ -51,17 +51,20 @@ public class TestingResourceManagerFactory extends
ResourceManagerFactory<Resour
internalDeregisterApplicationConsumer;
private final BiFunction<ResourceManager<?>, CompletableFuture<Void>,
CompletableFuture<Void>>
getTerminationFutureFunction;
+ private final boolean supportMultiLeaderSession;
public TestingResourceManagerFactory(
Consumer<UUID> initializeConsumer,
Consumer<UUID> terminateConsumer,
TriConsumer<UUID, ApplicationStatus, String>
internalDeregisterApplicationConsumer,
BiFunction<ResourceManager<?>, CompletableFuture<Void>,
CompletableFuture<Void>>
- getTerminationFutureFunction) {
+ getTerminationFutureFunction,
+ boolean supportMultiLeaderSession) {
this.initializeConsumer = initializeConsumer;
this.terminateConsumer = terminateConsumer;
this.internalDeregisterApplicationConsumer =
internalDeregisterApplicationConsumer;
this.getTerminationFutureFunction = getTerminationFutureFunction;
+ this.supportMultiLeaderSession = supportMultiLeaderSession;
}
@Override
@@ -101,6 +104,11 @@ public class TestingResourceManagerFactory extends
ResourceManagerFactory<Resour
.createResourceManagerRuntimeServicesConfiguration(configuration);
}
+ @Override
+ public boolean supportMultiLeaderSession() {
+ return supportMultiLeaderSession;
+ }
+
public static class Builder {
private Consumer<UUID> initializeConsumer = (ignore) -> {};
private Consumer<UUID> terminateConsumer = (ignore) -> {};
@@ -109,6 +117,7 @@ public class TestingResourceManagerFactory extends
ResourceManagerFactory<Resour
private BiFunction<ResourceManager<?>, CompletableFuture<Void>,
CompletableFuture<Void>>
getTerminationFutureFunction =
(rm, superTerminationFuture) -> superTerminationFuture;
+ private boolean supportMultiLeaderSession = true;
public Builder setInitializeConsumer(Consumer<UUID>
initializeConsumer) {
this.initializeConsumer = initializeConsumer;
@@ -134,12 +143,18 @@ public class TestingResourceManagerFactory extends
ResourceManagerFactory<Resour
return this;
}
+ public Builder setSupportMultiLeaderSession(boolean
supportMultiLeaderSession) {
+ this.supportMultiLeaderSession = supportMultiLeaderSession;
+ return this;
+ }
+
public TestingResourceManagerFactory build() {
return new TestingResourceManagerFactory(
initializeConsumer,
terminateConsumer,
internalDeregisterApplicationConsumer,
- getTerminationFutureFunction);
+ getTerminationFutureFunction,
+ supportMultiLeaderSession);
}
}
diff --git
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
index b1ef748..850cfc2 100644
---
a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
+++
b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
@@ -65,4 +65,12 @@ public class YarnResourceManagerFactory extends
ActiveResourceManagerFactory<Yar
return ResourceManagerRuntimeServicesConfiguration.fromConfiguration(
configuration, YarnWorkerResourceSpecFactory.INSTANCE);
}
+
+ @Override
+ public boolean supportMultiLeaderSession() {
+ // Multiple leader session is not supported by the Yarn deployment,
because Flink RM relies
+ // on the registration response from Yarn RM for recovering previous
resources, but Yarn
+ // only allows each AM process to register for once.
+ return false;
+ }
}