zentol commented on code in PR #22919:
URL: https://github.com/apache/flink/pull/22919#discussion_r1255497763
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -244,78 +358,89 @@ void testGrantCallWhileInstantiatingDriver() throws
Exception {
@Test
void testDelayedGrantCallAfterContenderRegistration() throws Exception {
- new Context() {
- {
- runTestWithManuallyTriggeredEvents(
- executorService -> {
- // we need to close to deregister the contender
that was already
- // registered to the service
- closeLeaderElectionInBothContexts();
-
- final UUID expectedSessionID = UUID.randomUUID();
- grantLeadership(expectedSessionID);
-
- applyToBothContenderContexts(
- ctx -> {
- try (LeaderElection
anotherLeaderElection =
-
leaderElectionService.createLeaderElection(
- ctx.contenderID)) {
- final TestingContender
testingContender =
- new TestingContender(
- ctx.address,
anotherLeaderElection);
-
testingContender.startLeaderElection();
-
-
assertThat(testingContender.getLeaderSessionID())
- .as(
- "Leadership grant
was not forwarded to the contender, yet.")
- .isNull();
-
- executorService.trigger();
-
-
assertThat(testingContender.getLeaderSessionID())
- .as(
- "Leadership grant
is actually forwarded to the service.")
-
.isEqualTo(expectedSessionID);
-
- testingContender.waitForLeader();
- }
- });
- });
+ final TestingLeaderElectionDriver.Factory driverFactory =
+ new TestingLeaderElectionDriver.Factory(
+ TestingLeaderElectionDriver.newNoOpBuilder());
+ final ManuallyTriggeredScheduledExecutorService
leaderEventOperationExecutor =
+ new ManuallyTriggeredScheduledExecutorService();
+ try (final DefaultLeaderElectionService leaderElectionService =
+ new DefaultLeaderElectionService(
+ driverFactory,
+
fatalErrorHandlerExtension.getTestingFatalErrorHandler(),
+ leaderEventOperationExecutor)) {
+
+ final AtomicBoolean firstContenderReceivedGrant = new
AtomicBoolean(false);
+ final LeaderContender firstContender =
+ TestingGenericLeaderContender.newBuilder()
+ .setGrantLeadershipConsumer(
+ ignoredSessionID ->
firstContenderReceivedGrant.set(true))
+ .build();
+
+ final AtomicBoolean secondContenderReceivedGrant = new
AtomicBoolean(false);
+ final LeaderContender secondContender =
+ TestingGenericLeaderContender.newBuilder()
+ .setGrantLeadershipConsumer(
+ ignoredSessionID ->
secondContenderReceivedGrant.set(true))
+ .build();
+ try (final LeaderElection firstLeaderElection =
+
leaderElectionService.createLeaderElection("contender_id_0")) {
Review Comment:
need to watch out here w.r.t. the contenderId -> componentName renaming
##########
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java:
##########
@@ -119,23 +112,23 @@ public LeaderRetrievalService
getClusterRestEndpointLeaderRetriever() {
}
@Override
- public LeaderElection getResourceManagerLeaderElection() throws Exception {
Review Comment:
should we also adjust the interface accordingly? :sob:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]