This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b7c013cc0a2a1a5846ed389336cbe5fbecb8a0ad
Author: Till Rohrmann <[email protected]>
AuthorDate: Thu Feb 21 15:48:33 2019 +0100

    [hotfix] Allow leader assignment to TestingLeaderElectionService if it has 
not been started
---
 .../flink/runtime/dispatcher/DispatcherHATest.java |  3 ++
 .../flink/runtime/dispatcher/DispatcherTest.java   |  2 +-
 .../TestingLeaderElectionService.java              | 48 +++++++++++++++-------
 3 files changed, 38 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
index d71804b..711694b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java
@@ -135,6 +135,9 @@ public class DispatcherHATest extends TestLogger {
                dispatcher.start();
 
                try {
+                       // wait until the election service has been started
+                       dispatcherLeaderElectionService.getStartFuture().get();
+
                        final UUID leaderId = UUID.randomUUID();
                        dispatcherLeaderElectionService.isLeader(leaderId);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 2c3712f..f0c2772 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -266,7 +266,7 @@ public class DispatcherTest extends TestLogger {
 
                assertTrue(
                        "jobManagerRunner was not started",
-                       dispatcherLeaderElectionService.isStarted());
+                       
dispatcherLeaderElectionService.getStartFuture().isDone());
        }
 
        /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index 25f97c0..e60f2f7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -29,9 +29,10 @@ import java.util.concurrent.CompletableFuture;
  */
 public class TestingLeaderElectionService implements LeaderElectionService {
 
-       private LeaderContender contender;
+       private LeaderContender contender = null;
        private boolean hasLeadership = false;
        private CompletableFuture<UUID> confirmationFuture = null;
+       private CompletableFuture<Void> startFuture = new CompletableFuture<>();
        private UUID issuedLeaderSessionId = null;
 
        /**
@@ -44,13 +45,25 @@ public class TestingLeaderElectionService implements 
LeaderElectionService {
        }
 
        @Override
-       public synchronized void start(LeaderContender contender) throws 
Exception {
+       public synchronized void start(LeaderContender contender) {
+               assert(!getStartFuture().isDone());
+
                this.contender = contender;
+
+               if (hasLeadership) {
+                       contender.grantLeadership(issuedLeaderSessionId);
+               }
+
+               startFuture.complete(null);
        }
 
        @Override
        public synchronized void stop() throws Exception {
-
+               contender = null;
+               hasLeadership = false;
+               issuedLeaderSessionId = null;
+               startFuture.cancel(false);
+               startFuture = new CompletableFuture<>();
        }
 
        @Override
@@ -72,31 +85,38 @@ public class TestingLeaderElectionService implements 
LeaderElectionService {
                confirmationFuture = new CompletableFuture<>();
                hasLeadership = true;
                issuedLeaderSessionId = leaderSessionID;
-               contender.grantLeadership(leaderSessionID);
+
+               if (contender != null) {
+                       contender.grantLeadership(leaderSessionID);
+               }
 
                return confirmationFuture;
        }
 
        public synchronized void notLeader() {
                hasLeadership = false;
-               contender.revokeLeadership();
-       }
 
-       public synchronized void reset() {
-               contender = null;
-               hasLeadership  = false;
+               if (contender != null) {
+                       contender.revokeLeadership();
+               }
        }
 
        public synchronized String getAddress() {
-               return contender.getAddress();
+               if (contender != null) {
+                       return contender.getAddress();
+               } else {
+                       throw new 
IllegalStateException("TestingLeaderElectionService has not been started.");
+               }
        }
 
        /**
-        * Returns <code>true</code> if {@link #start(LeaderContender)} was 
called,
-        * <code>false</code> otherwise.
+        * Returns the start future indicating whether this leader election 
service
+        * has been started or not.
+        *
+        * @return Future which is completed once this service has been started
         */
-       public synchronized boolean isStarted() {
-               return contender != null;
+       public synchronized CompletableFuture<Void> getStartFuture() {
+               return startFuture;
        }
 
 }

Reply via email to