This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b7c013cc0a2a1a5846ed389336cbe5fbecb8a0ad Author: Till Rohrmann <[email protected]> AuthorDate: Thu Feb 21 15:48:33 2019 +0100 [hotfix] Allow leader assignment to TestingLeaderElectionService if it has not been started --- .../flink/runtime/dispatcher/DispatcherHATest.java | 3 ++ .../flink/runtime/dispatcher/DispatcherTest.java | 2 +- .../TestingLeaderElectionService.java | 48 +++++++++++++++------- 3 files changed, 38 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java index d71804b..711694b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherHATest.java @@ -135,6 +135,9 @@ public class DispatcherHATest extends TestLogger { dispatcher.start(); try { + // wait until the election service has been started + dispatcherLeaderElectionService.getStartFuture().get(); + final UUID leaderId = UUID.randomUUID(); dispatcherLeaderElectionService.isLeader(leaderId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 2c3712f..f0c2772 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -266,7 +266,7 @@ public class DispatcherTest extends TestLogger { assertTrue( "jobManagerRunner was not started", - dispatcherLeaderElectionService.isStarted()); + dispatcherLeaderElectionService.getStartFuture().isDone()); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java index 25f97c0..e60f2f7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java @@ -29,9 +29,10 @@ import java.util.concurrent.CompletableFuture; */ public class TestingLeaderElectionService implements LeaderElectionService { - private LeaderContender contender; + private LeaderContender contender = null; private boolean hasLeadership = false; private CompletableFuture<UUID> confirmationFuture = null; + private CompletableFuture<Void> startFuture = new CompletableFuture<>(); private UUID issuedLeaderSessionId = null; /** @@ -44,13 +45,25 @@ public class TestingLeaderElectionService implements LeaderElectionService { } @Override - public synchronized void start(LeaderContender contender) throws Exception { + public synchronized void start(LeaderContender contender) { + assert(!getStartFuture().isDone()); + this.contender = contender; + + if (hasLeadership) { + contender.grantLeadership(issuedLeaderSessionId); + } + + startFuture.complete(null); } @Override public synchronized void stop() throws Exception { - + contender = null; + hasLeadership = false; + issuedLeaderSessionId = null; + startFuture.cancel(false); + startFuture = new CompletableFuture<>(); } @Override @@ -72,31 +85,38 @@ public class TestingLeaderElectionService implements LeaderElectionService { confirmationFuture = new CompletableFuture<>(); hasLeadership = true; issuedLeaderSessionId = leaderSessionID; - contender.grantLeadership(leaderSessionID); + + if (contender != null) { + contender.grantLeadership(leaderSessionID); + } return confirmationFuture; } public synchronized void notLeader() { hasLeadership = false; - contender.revokeLeadership(); - } - public synchronized void reset() { - contender = null; - hasLeadership = false; + if (contender != null) { + contender.revokeLeadership(); + } } public synchronized String getAddress() { - return contender.getAddress(); + if (contender != null) { + return contender.getAddress(); + } else { + throw new IllegalStateException("TestingLeaderElectionService has not been started."); + } } /** - * Returns <code>true</code> if {@link #start(LeaderContender)} was called, - * <code>false</code> otherwise. + * Returns the start future indicating whether this leader election service + * has been started or not. + * + * @return Future which is completed once this service has been started */ - public synchronized boolean isStarted() { - return contender != null; + public synchronized CompletableFuture<Void> getStartFuture() { + return startFuture; } }
