XComp commented on code in PR #22830:
URL: https://github.com/apache/flink/pull/22830#discussion_r1235120950
##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java:
##########
@@ -89,68 +92,67 @@ void testLeaderElectionAndRetrieval() throws Exception {
clusterId,
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY));
closeables.add(configMapSharedWatcher);
- final TestingLeaderElectionEventHandler electionEventHandler =
- new TestingLeaderElectionEventHandler(LEADER_ADDRESS);
- closeables.add(electionEventHandler);
+ final TestingLeaderElectionListener electionEventHandler =
+ new TestingLeaderElectionListener();
+ final TestingFatalErrorHandler fatalErrorHandler = new
TestingFatalErrorHandler();
try {
- final KubernetesLeaderElectionDriver leaderElectionDriver =
- new KubernetesLeaderElectionDriver(
- flinkKubeClient,
- configMapSharedWatcher,
- EXECUTOR_EXTENSION.getExecutor(),
+ final KubernetesMultipleComponentLeaderElectionDriver
leaderElectionDriver =
+ new KubernetesMultipleComponentLeaderElectionDriver(
new KubernetesLeaderElectionConfiguration(
configMapName,
UUID.randomUUID().toString(), configuration),
+ flinkKubeClient,
electionEventHandler,
- electionEventHandler::handleError);
+ configMapSharedWatcher,
+ EXECUTOR_EXTENSION.getExecutor(),
+ fatalErrorHandler);
closeables.add(leaderElectionDriver);
- electionEventHandler.init(leaderElectionDriver);
-
- final Function<TestingLeaderRetrievalEventHandler, AutoCloseable>
- leaderRetrievalDriverFactory =
- leaderRetrievalEventHandler ->
- new KubernetesLeaderRetrievalDriver(
- configMapSharedWatcher,
- EXECUTOR_EXTENSION.getExecutor(),
- configMapName,
- leaderRetrievalEventHandler,
-
KubernetesUtils::getLeaderInformationFromConfigMap,
-
leaderRetrievalEventHandler::handleError);
+ final KubernetesMultipleComponentLeaderRetrievalDriverFactory
driverFactory =
+ new
KubernetesMultipleComponentLeaderRetrievalDriverFactory(
+ configMapSharedWatcher,
+ EXECUTOR_EXTENSION.getExecutor(),
+ configMapName,
+ contenderID);
final TestingLeaderRetrievalEventHandler
firstLeaderRetrievalEventHandler =
new TestingLeaderRetrievalEventHandler();
-
closeables.add(leaderRetrievalDriverFactory.apply(firstLeaderRetrievalEventHandler));
+ closeables.add(
+ driverFactory.createLeaderRetrievalDriver(
+ firstLeaderRetrievalEventHandler,
fatalErrorHandler));
// Wait for the driver to obtain leadership.
- electionEventHandler.waitForLeader();
- final LeaderInformation confirmedLeaderInformation =
- electionEventHandler.getConfirmedLeaderInformation();
-
assertThat(confirmedLeaderInformation.getLeaderAddress()).isEqualTo(LEADER_ADDRESS);
+
electionEventHandler.await(LeaderElectionEvent.IsLeaderEvent.class);
+ final UUID leaderSessionID = UUID.randomUUID();
+ leaderElectionDriver.publishLeaderInformation(
+ contenderID, LeaderInformation.known(leaderSessionID,
leaderAddress));
// Check if the leader retrieval driver is notified about the
leader address
- awaitLeadership(firstLeaderRetrievalEventHandler,
confirmedLeaderInformation);
+ awaitLeadership(firstLeaderRetrievalEventHandler, leaderSessionID,
leaderAddress);
// Start a second leader retrieval that should be notified
immediately because we
// already know who the leader is.
final TestingLeaderRetrievalEventHandler
secondRetrievalEventHandler =
new TestingLeaderRetrievalEventHandler();
-
closeables.add(leaderRetrievalDriverFactory.apply(secondRetrievalEventHandler));
- awaitLeadership(secondRetrievalEventHandler,
confirmedLeaderInformation);
+ closeables.add(
+ driverFactory.createLeaderRetrievalDriver(
+ secondRetrievalEventHandler, fatalErrorHandler));
+ awaitLeadership(secondRetrievalEventHandler, leaderSessionID,
leaderAddress);
} finally {
for (AutoCloseable closeable : closeables) {
closeable.close();
}
+
+ Thread.sleep(1000);
flinkKubeClient.deleteConfigMap(configMapName).get();
}
}
private static void awaitLeadership(
- TestingLeaderRetrievalEventHandler handler, LeaderInformation
leaderInformation)
+ TestingLeaderRetrievalEventHandler handler, UUID leaderSessionID,
String leaderAddress)
Review Comment:
:+1: I reverted that change
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]