XComp commented on code in PR #22830:
URL: https://github.com/apache/flink/pull/22830#discussion_r1235122006


##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java:
##########
@@ -89,68 +92,67 @@ void testLeaderElectionAndRetrieval() throws Exception {
                                 clusterId, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY));
         closeables.add(configMapSharedWatcher);
 
-        final TestingLeaderElectionEventHandler electionEventHandler =
-                new TestingLeaderElectionEventHandler(LEADER_ADDRESS);
-        closeables.add(electionEventHandler);
+        final TestingLeaderElectionListener electionEventHandler =
+                new TestingLeaderElectionListener();
+        final TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
 
         try {
-            final KubernetesLeaderElectionDriver leaderElectionDriver =
-                    new KubernetesLeaderElectionDriver(
-                            flinkKubeClient,
-                            configMapSharedWatcher,
-                            EXECUTOR_EXTENSION.getExecutor(),
+            final KubernetesMultipleComponentLeaderElectionDriver 
leaderElectionDriver =
+                    new KubernetesMultipleComponentLeaderElectionDriver(
                             new KubernetesLeaderElectionConfiguration(
                                     configMapName, 
UUID.randomUUID().toString(), configuration),
+                            flinkKubeClient,
                             electionEventHandler,
-                            electionEventHandler::handleError);
+                            configMapSharedWatcher,
+                            EXECUTOR_EXTENSION.getExecutor(),
+                            fatalErrorHandler);
             closeables.add(leaderElectionDriver);
 
-            electionEventHandler.init(leaderElectionDriver);
-
-            final Function<TestingLeaderRetrievalEventHandler, AutoCloseable>
-                    leaderRetrievalDriverFactory =
-                            leaderRetrievalEventHandler ->
-                                    new KubernetesLeaderRetrievalDriver(
-                                            configMapSharedWatcher,
-                                            EXECUTOR_EXTENSION.getExecutor(),
-                                            configMapName,
-                                            leaderRetrievalEventHandler,
-                                            
KubernetesUtils::getLeaderInformationFromConfigMap,
-                                            
leaderRetrievalEventHandler::handleError);
+            final KubernetesMultipleComponentLeaderRetrievalDriverFactory 
driverFactory =
+                    new 
KubernetesMultipleComponentLeaderRetrievalDriverFactory(
+                            configMapSharedWatcher,
+                            EXECUTOR_EXTENSION.getExecutor(),
+                            configMapName,
+                            contenderID);
 
             final TestingLeaderRetrievalEventHandler 
firstLeaderRetrievalEventHandler =
                     new TestingLeaderRetrievalEventHandler();
-            
closeables.add(leaderRetrievalDriverFactory.apply(firstLeaderRetrievalEventHandler));
+            closeables.add(
+                    driverFactory.createLeaderRetrievalDriver(
+                            firstLeaderRetrievalEventHandler, 
fatalErrorHandler));
 
             // Wait for the driver to obtain leadership.
-            electionEventHandler.waitForLeader();
-            final LeaderInformation confirmedLeaderInformation =
-                    electionEventHandler.getConfirmedLeaderInformation();
-            
assertThat(confirmedLeaderInformation.getLeaderAddress()).isEqualTo(LEADER_ADDRESS);
+            
electionEventHandler.await(LeaderElectionEvent.IsLeaderEvent.class);
+            final UUID leaderSessionID = UUID.randomUUID();
+            leaderElectionDriver.publishLeaderInformation(
+                    contenderID, LeaderInformation.known(leaderSessionID, 
leaderAddress));
 
             // Check if the leader retrieval driver is notified about the 
leader address
-            awaitLeadership(firstLeaderRetrievalEventHandler, 
confirmedLeaderInformation);
+            awaitLeadership(firstLeaderRetrievalEventHandler, leaderSessionID, 
leaderAddress);
 
             // Start a second leader retrieval that should be notified 
immediately because we
             // already know who the leader is.
             final TestingLeaderRetrievalEventHandler 
secondRetrievalEventHandler =
                     new TestingLeaderRetrievalEventHandler();
-            
closeables.add(leaderRetrievalDriverFactory.apply(secondRetrievalEventHandler));
-            awaitLeadership(secondRetrievalEventHandler, 
confirmedLeaderInformation);
+            closeables.add(
+                    driverFactory.createLeaderRetrievalDriver(
+                            secondRetrievalEventHandler, fatalErrorHandler));
+            awaitLeadership(secondRetrievalEventHandler, leaderSessionID, 
leaderAddress);
         } finally {
             for (AutoCloseable closeable : closeables) {
                 closeable.close();
             }
+
+            Thread.sleep(1000);

Review Comment:
   That must have been a leftover from when I debugged the test. I removed it 
now. The test also runs without this and I don't see any reason why we need it 
here. Thanks for pointing me to it and sorry for the poor prep. The branches 
have been left hanging for too long. I should have done a pass over it.



##########
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionAndRetrievalITCase.java:
##########
@@ -89,68 +92,67 @@ void testLeaderElectionAndRetrieval() throws Exception {
                                 clusterId, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY));
         closeables.add(configMapSharedWatcher);
 
-        final TestingLeaderElectionEventHandler electionEventHandler =
-                new TestingLeaderElectionEventHandler(LEADER_ADDRESS);
-        closeables.add(electionEventHandler);
+        final TestingLeaderElectionListener electionEventHandler =
+                new TestingLeaderElectionListener();
+        final TestingFatalErrorHandler fatalErrorHandler = new 
TestingFatalErrorHandler();
 
         try {
-            final KubernetesLeaderElectionDriver leaderElectionDriver =
-                    new KubernetesLeaderElectionDriver(
-                            flinkKubeClient,
-                            configMapSharedWatcher,
-                            EXECUTOR_EXTENSION.getExecutor(),
+            final KubernetesMultipleComponentLeaderElectionDriver 
leaderElectionDriver =
+                    new KubernetesMultipleComponentLeaderElectionDriver(
                             new KubernetesLeaderElectionConfiguration(
                                     configMapName, 
UUID.randomUUID().toString(), configuration),
+                            flinkKubeClient,
                             electionEventHandler,
-                            electionEventHandler::handleError);
+                            configMapSharedWatcher,
+                            EXECUTOR_EXTENSION.getExecutor(),
+                            fatalErrorHandler);
             closeables.add(leaderElectionDriver);
 
-            electionEventHandler.init(leaderElectionDriver);
-
-            final Function<TestingLeaderRetrievalEventHandler, AutoCloseable>
-                    leaderRetrievalDriverFactory =
-                            leaderRetrievalEventHandler ->
-                                    new KubernetesLeaderRetrievalDriver(
-                                            configMapSharedWatcher,
-                                            EXECUTOR_EXTENSION.getExecutor(),
-                                            configMapName,
-                                            leaderRetrievalEventHandler,
-                                            
KubernetesUtils::getLeaderInformationFromConfigMap,
-                                            
leaderRetrievalEventHandler::handleError);
+            final KubernetesMultipleComponentLeaderRetrievalDriverFactory 
driverFactory =
+                    new 
KubernetesMultipleComponentLeaderRetrievalDriverFactory(
+                            configMapSharedWatcher,
+                            EXECUTOR_EXTENSION.getExecutor(),
+                            configMapName,
+                            contenderID);
 
             final TestingLeaderRetrievalEventHandler 
firstLeaderRetrievalEventHandler =
                     new TestingLeaderRetrievalEventHandler();
-            
closeables.add(leaderRetrievalDriverFactory.apply(firstLeaderRetrievalEventHandler));
+            closeables.add(
+                    driverFactory.createLeaderRetrievalDriver(
+                            firstLeaderRetrievalEventHandler, 
fatalErrorHandler));
 
             // Wait for the driver to obtain leadership.
-            electionEventHandler.waitForLeader();
-            final LeaderInformation confirmedLeaderInformation =
-                    electionEventHandler.getConfirmedLeaderInformation();
-            
assertThat(confirmedLeaderInformation.getLeaderAddress()).isEqualTo(LEADER_ADDRESS);
+            
electionEventHandler.await(LeaderElectionEvent.IsLeaderEvent.class);
+            final UUID leaderSessionID = UUID.randomUUID();
+            leaderElectionDriver.publishLeaderInformation(
+                    contenderID, LeaderInformation.known(leaderSessionID, 
leaderAddress));
 
             // Check if the leader retrieval driver is notified about the 
leader address
-            awaitLeadership(firstLeaderRetrievalEventHandler, 
confirmedLeaderInformation);
+            awaitLeadership(firstLeaderRetrievalEventHandler, leaderSessionID, 
leaderAddress);
 
             // Start a second leader retrieval that should be notified 
immediately because we
             // already know who the leader is.
             final TestingLeaderRetrievalEventHandler 
secondRetrievalEventHandler =
                     new TestingLeaderRetrievalEventHandler();
-            
closeables.add(leaderRetrievalDriverFactory.apply(secondRetrievalEventHandler));
-            awaitLeadership(secondRetrievalEventHandler, 
confirmedLeaderInformation);
+            closeables.add(
+                    driverFactory.createLeaderRetrievalDriver(
+                            secondRetrievalEventHandler, fatalErrorHandler));
+            awaitLeadership(secondRetrievalEventHandler, leaderSessionID, 
leaderAddress);
         } finally {
             for (AutoCloseable closeable : closeables) {
                 closeable.close();
             }
+
+            Thread.sleep(1000);

Review Comment:
   That must have been a leftover from when I debugged the test. I removed it 
now. The test also runs without this and I don't see any reason why we need it 
here. Thanks for pointing me to it and sorry for the poor prep. The branches 
have been left hanging for too long. I should have done a pass over it before 
finally creating the PRs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to