Repository: flink Updated Branches: refs/heads/master 4d8cbec4f -> 1a6bab3ef
[FLINK-2733] [tests] Harden ZooKeeperLeaderElectionTest Hardens ZooKeeperElectionTest by allowing the testing listener to return out-dated leader information. This can happen if the ZooKeeper connection was suspended and the new leader information has not been sent to the testing listener. In this case, the testing listener will be queried again to return the actual leader information. Add debug statements to ZooKeeperLeaderElectionTest.testZooKeeperReelection This closes #2103. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a6bab3e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a6bab3e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a6bab3e Branch: refs/heads/master Commit: 1a6bab3ef76805685044cf4521e32315169f9033 Parents: 4d8cbec Author: Till Rohrmann <[email protected]> Authored: Mon Jun 6 17:18:59 2016 +0200 Committer: Till Rohrmann <[email protected]> Committed: Fri Jun 17 14:49:59 2016 +0200 ---------------------------------------------------------------------- .../ZooKeeperLeaderElectionService.java | 4 +- .../runtime/leaderelection/TestingListener.java | 4 +- .../ZooKeeperLeaderElectionTest.java | 39 +++++++++++++++----- 3 files changed, 34 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1a6bab3e/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java index e9aaaf8..0fa6a9e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java @@ -337,7 +337,7 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le break; case SUSPENDED: LOG.warn("Connection to ZooKeeper suspended. The contender " + leaderContender.getAddress() - + "no longer participates in the leader election."); + + " no longer participates in the leader election."); break; case RECONNECTED: LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted."); @@ -345,7 +345,7 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le case LOST: // Maybe we have to throw an exception here to terminate the JobManager LOG.warn("Connection to ZooKeeper lost. The contender " + leaderContender.getAddress() - + "no longer participates in the leader election."); + + " no longer participates in the leader election."); break; } } http://git-wip-us.apache.org/repos/asf/flink/blob/1a6bab3e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java index 54ee822..87decc7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java @@ -47,7 +47,7 @@ public class TestingListener implements LeaderRetrievalListener { return leaderSessionID; } - public void waitForNewLeader(long timeout) throws Exception { + public String waitForNewLeader(long timeout) throws Exception { long start = System.currentTimeMillis(); long curTimeout; @@ -72,6 +72,8 @@ public class TestingListener implements LeaderRetrievalListener { } oldAddress = address; + + return address; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/1a6bab3e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java index 34a582f..6ebf2dd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java @@ -38,9 +38,12 @@ import org.junit.Test; import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.Promise; +import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; import java.io.ByteArrayOutputStream; @@ -58,6 +61,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { private static final String TEST_URL = "akka//user/jobmanager"; private static final FiniteDuration timeout = new FiniteDuration(200, TimeUnit.SECONDS); + private static Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionTest.class); + @Before public void before() { try { @@ -132,7 +137,9 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString()); configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper"); - int num = 50; + Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow(); + + int num = 25; ZooKeeperLeaderElectionService[] leaderElectionService = new ZooKeeperLeaderElectionService[num]; TestingContender[] contenders = new TestingContender[num]; @@ -143,40 +150,52 @@ public class ZooKeeperLeaderElectionTest extends TestLogger { try { leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration); + LOG.debug("Start leader retrieval service for the TestingListener."); + leaderRetrievalService.start(listener); for (int i = 0; i < num; i++) { leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(configuration); contenders[i] = new TestingContender(TEST_URL + "_" + i, leaderElectionService[i]); + LOG.debug("Start leader election service for contender #{}.", i); + leaderElectionService[i].start(contenders[i]); } String pattern = TEST_URL + "_" + "(\\d+)"; Pattern regex = Pattern.compile(pattern); - for (int i = 0; i < num; i++) { - listener.waitForNewLeader(timeout.toMillis()); + int numberSeenLeaders = 0; - String address = listener.getAddress(); + while (deadline.hasTimeLeft() && numberSeenLeaders < num) { + LOG.debug("Wait for new leader #{}.", numberSeenLeaders); + String address = listener.waitForNewLeader(deadline.timeLeft().toMillis()); Matcher m = regex.matcher(address); if (m.find()) { int index = Integer.parseInt(m.group(1)); - // check that the leader session ID of the listeners and the leader are equal - assertEquals(listener.getLeaderSessionID(), contenders[index].getLeaderSessionID()); - assertEquals(TEST_URL + "_" + index, listener.getAddress()); + TestingContender contender = contenders[index]; - // kill the election service of the leader - leaderElectionService[index].stop(); - leaderElectionService[index] = null; + // check that the retrieval service has retrieved the correct leader + if (address.equals(contender.getAddress()) && listener.getLeaderSessionID().equals(contender.getLeaderSessionID())) { + // kill the election service of the leader + LOG.debug("Stop leader election service of contender #{}.", numberSeenLeaders); + leaderElectionService[index].stop(); + leaderElectionService[index] = null; + + numberSeenLeaders++; + } } else { fail("Did not find the leader's index."); } } + assertFalse(deadline.isOverdue()); + assertEquals(num, numberSeenLeaders); + } finally { if (leaderRetrievalService != null) { leaderRetrievalService.stop();
