XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169638735
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
};
}
+ @Test
+ void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+ final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
driverFactory =
+ new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+ try (final DefaultLeaderElectionService testInstance =
+ new DefaultLeaderElectionService(driverFactory)) {
+ testInstance.startLeaderElectionBackend();
+
+ final TestingLeaderElectionDriver driver =
driverFactory.getCurrentLeaderDriver();
+ assertThat(driver).isNotNull();
+
+ final CompletableFuture<Void> grantLeadershipFuture = new
CompletableFuture<>();
+ driver.isLeader(grantLeadershipFuture);
+
+ final TestingContender contender = new
TestingContender("unused-address", testInstance);
+ testInstance.start(contender);
+
+ assertThat(testInstance.getLeaderSessionID())
+ .as("Leadership grant was not forwarded to the contender,
yet.")
+ .isNull();
+
+ grantLeadershipFuture.complete(null);
+
+ contender.waitForLeader();
+
+ testInstance.stop();
+ }
+ }
+
+ /**
+ * Test to cover the issue described in FLINK-31814. This test could be
removed after
+ * FLINK-31814 is resolved.
+ */
+ @Test
+ void testOnRevokeCallWhileClosingService() throws Exception {
+ final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
driverFactory =
+ new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
+ LeaderElectionEventHandler::onRevokeLeadership);
+
+ try (final DefaultLeaderElectionService testInstance =
+ new DefaultLeaderElectionService(driverFactory)) {
+ testInstance.startLeaderElectionBackend();
+
+ final TestingLeaderElectionDriver driver =
driverFactory.getCurrentLeaderDriver();
+ assertThat(driver).isNotNull();
+
+ driver.isLeader();
+
+ final TestingContender contender = new
TestingContender("unused-address", testInstance);
+ testInstance.start(contender);
+
+ contender.waitForLeader();
+
+ testInstance.stop();
+ }
+ }
+
+ /**
+ * This issue can happen when the shutdown of the contender takes too long
and the leadership is
+ * re-acquired in the meantime (see FLINK-29234).
+ */
+ @Test
+ void testStopWhileHavingLeadership() throws Exception {
+ final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
driverFactory =
+ new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+
+ try (final DefaultLeaderElectionService testInstance =
+ new DefaultLeaderElectionService(driverFactory)) {
+ testInstance.startLeaderElectionBackend();
Review Comment:
Initially, I thought of putting it in public interface but realized that
it's specific to the `DefaultLeaderElectionService` implementation and,
therefore, removed it from the `LeaderElectionService` interface. It just
didn't cross my mind to call the method in the constructor. But it's a valid
point now that's it's out of the interface. I will change it. :+1:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]