XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169762851
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
};
}
+ @Test
+ void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+ final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
driverFactory =
+ new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+ try (final DefaultLeaderElectionService testInstance =
+ new DefaultLeaderElectionService(driverFactory)) {
+ testInstance.startLeaderElectionBackend();
+
+ final TestingLeaderElectionDriver driver =
driverFactory.getCurrentLeaderDriver();
+ assertThat(driver).isNotNull();
+
+ final CompletableFuture<Void> grantLeadershipFuture = new
CompletableFuture<>();
+ driver.isLeader(grantLeadershipFuture);
+
+ final TestingContender contender = new
TestingContender("unused-address", testInstance);
+ testInstance.start(contender);
+
+ assertThat(testInstance.getLeaderSessionID())
+ .as("Leadership grant was not forwarded to the contender,
yet.")
+ .isNull();
+
+ grantLeadershipFuture.complete(null);
+
+ contender.waitForLeader();
+
+ testInstance.stop();
+ }
+ }
+
+ /**
+ * Test to cover the issue described in FLINK-31814. This test could be
removed after
+ * FLINK-31814 is resolved.
+ */
+ @Test
+ void testOnRevokeCallWhileClosingService() throws Exception {
+ final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
driverFactory =
+ new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
+ LeaderElectionEventHandler::onRevokeLeadership);
+
+ try (final DefaultLeaderElectionService testInstance =
+ new DefaultLeaderElectionService(driverFactory)) {
+ testInstance.startLeaderElectionBackend();
+
+ final TestingLeaderElectionDriver driver =
driverFactory.getCurrentLeaderDriver();
+ assertThat(driver).isNotNull();
+
+ driver.isLeader();
+
+ final TestingContender contender = new
TestingContender("unused-address", testInstance);
+ testInstance.start(contender);
+
+ contender.waitForLeader();
+
+ testInstance.stop();
+ }
+ }
+
+ /**
+ * This issue can happen when the shutdown of the contender takes too long
and the leadership is
+ * re-acquired in the meantime (see FLINK-29234).
+ */
+ @Test
+ void testStopWhileHavingLeadership() throws Exception {
+ final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
driverFactory =
+ new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+
+ try (final DefaultLeaderElectionService testInstance =
+ new DefaultLeaderElectionService(driverFactory)) {
+ testInstance.startLeaderElectionBackend();
Review Comment:
Ah, now I remember why I added this method in the first place. With the
`MultipleComponentLeaderElection*` classes we added this circular dependency
between the `DefaultLeaderElectionService` and the
`DefaultMultipleComponentLeaderElectionService` which calls the
`DefaultLeaderElectionService.onGrantLeadership` while registering the service
in
[DefaultMultipleComponentLeaederElectionService:152](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L152).
This call will result in accessing the `DefaultLeaderElectionService` instance
which is still in instantiation phase.
I created FLINK-31837 to cover this issue in a follow-up task and extended
the method's javadoc acocrdingly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]