Repository: flink
Updated Branches:
  refs/heads/master 4d8cbec4f -> 1a6bab3ef


[FLINK-2733] [tests] Harden ZooKeeperLeaderElectionTest

Hardens ZooKeeperElectionTest by allowing the testing listener to return
out-dated leader information. This can happen if the ZooKeeper connection
was suspended and the new leader information has not been sent to the
testing listener. In this case, the testing listener will be queried again
to return the actual leader information.

Add debug statements to ZooKeeperLeaderElectionTest.testZooKeeperReelection

This closes #2103.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a6bab3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a6bab3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a6bab3e

Branch: refs/heads/master
Commit: 1a6bab3ef76805685044cf4521e32315169f9033
Parents: 4d8cbec
Author: Till Rohrmann <[email protected]>
Authored: Mon Jun 6 17:18:59 2016 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Fri Jun 17 14:49:59 2016 +0200

----------------------------------------------------------------------
 .../ZooKeeperLeaderElectionService.java         |  4 +-
 .../runtime/leaderelection/TestingListener.java |  4 +-
 .../ZooKeeperLeaderElectionTest.java            | 39 +++++++++++++++-----
 3 files changed, 34 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a6bab3e/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index e9aaaf8..0fa6a9e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -337,7 +337,7 @@ public class ZooKeeperLeaderElectionService implements 
LeaderElectionService, Le
                                break;
                        case SUSPENDED:
                                LOG.warn("Connection to ZooKeeper suspended. 
The contender " + leaderContender.getAddress()
-                                       + "no longer participates in the leader 
election.");
+                                       + " no longer participates in the 
leader election.");
                                break;
                        case RECONNECTED:
                                LOG.info("Connection to ZooKeeper was 
reconnected. Leader election can be restarted.");
@@ -345,7 +345,7 @@ public class ZooKeeperLeaderElectionService implements 
LeaderElectionService, Le
                        case LOST:
                                // Maybe we have to throw an exception here to 
terminate the JobManager
                                LOG.warn("Connection to ZooKeeper lost. The 
contender " + leaderContender.getAddress()
-                                       + "no longer participates in the leader 
election.");
+                                       + " no longer participates in the 
leader election.");
                                break;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1a6bab3e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
index 54ee822..87decc7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
@@ -47,7 +47,7 @@ public class TestingListener implements 
LeaderRetrievalListener {
                return leaderSessionID;
        }
 
-       public void waitForNewLeader(long timeout) throws Exception {
+       public String waitForNewLeader(long timeout) throws Exception {
                long start = System.currentTimeMillis();
                long curTimeout;
 
@@ -72,6 +72,8 @@ public class TestingListener implements 
LeaderRetrievalListener {
                }
 
                oldAddress = address;
+
+               return address;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1a6bab3e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 34a582f..6ebf2dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -38,9 +38,12 @@ import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
+import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.ByteArrayOutputStream;
@@ -58,6 +61,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
        private static final String TEST_URL = "akka//user/jobmanager";
        private static final FiniteDuration timeout = new FiniteDuration(200, 
TimeUnit.SECONDS);
 
+       private static Logger LOG = 
LoggerFactory.getLogger(ZooKeeperLeaderElectionTest.class);
+
        @Before
        public void before() {
                try {
@@ -132,7 +137,9 @@ public class ZooKeeperLeaderElectionTest extends TestLogger 
{
                configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, 
testingServer.getConnectString());
                configuration.setString(ConfigConstants.RECOVERY_MODE, 
"zookeeper");
 
-               int num = 50;
+               Deadline deadline = new FiniteDuration(3, 
TimeUnit.MINUTES).fromNow();
+
+               int num = 25;
 
                ZooKeeperLeaderElectionService[] leaderElectionService = new 
ZooKeeperLeaderElectionService[num];
                TestingContender[] contenders = new TestingContender[num];
@@ -143,40 +150,52 @@ public class ZooKeeperLeaderElectionTest extends 
TestLogger {
                try {
                        leaderRetrievalService = 
ZooKeeperUtils.createLeaderRetrievalService(configuration);
 
+                       LOG.debug("Start leader retrieval service for the 
TestingListener.");
+
                        leaderRetrievalService.start(listener);
 
                        for (int i = 0; i < num; i++) {
                                leaderElectionService[i] = 
ZooKeeperUtils.createLeaderElectionService(configuration);
                                contenders[i] = new TestingContender(TEST_URL + 
"_" + i, leaderElectionService[i]);
 
+                               LOG.debug("Start leader election service for 
contender #{}.", i);
+
                                leaderElectionService[i].start(contenders[i]);
                        }
 
                        String pattern = TEST_URL + "_" + "(\\d+)";
                        Pattern regex = Pattern.compile(pattern);
 
-                       for (int i = 0; i < num; i++) {
-                               listener.waitForNewLeader(timeout.toMillis());
+                       int numberSeenLeaders = 0;
 
-                               String address = listener.getAddress();
+                       while (deadline.hasTimeLeft() && numberSeenLeaders < 
num) {
+                               LOG.debug("Wait for new leader #{}.", 
numberSeenLeaders);
+                               String address = 
listener.waitForNewLeader(deadline.timeLeft().toMillis());
 
                                Matcher m = regex.matcher(address);
 
                                if (m.find()) {
                                        int index = 
Integer.parseInt(m.group(1));
 
-                                       // check that the leader session ID of 
the listeners and the leader are equal
-                                       
assertEquals(listener.getLeaderSessionID(), 
contenders[index].getLeaderSessionID());
-                                       assertEquals(TEST_URL + "_" + index, 
listener.getAddress());
+                                       TestingContender contender = 
contenders[index];
 
-                                       // kill the election service of the 
leader
-                                       leaderElectionService[index].stop();
-                                       leaderElectionService[index] = null;
+                                       // check that the retrieval service has 
retrieved the correct leader
+                                       if 
(address.equals(contender.getAddress()) && 
listener.getLeaderSessionID().equals(contender.getLeaderSessionID())) {
+                                               // kill the election service of 
the leader
+                                               LOG.debug("Stop leader election 
service of contender #{}.", numberSeenLeaders);
+                                               
leaderElectionService[index].stop();
+                                               leaderElectionService[index] = 
null;
+
+                                               numberSeenLeaders++;
+                                       }
                                } else {
                                        fail("Did not find the leader's 
index.");
                                }
                        }
 
+                       assertFalse(deadline.isOverdue());
+                       assertEquals(num, numberSeenLeaders);
+
                } finally {
                        if (leaderRetrievalService != null) {
                                leaderRetrievalService.stop();

Reply via email to