Repository: flink
Updated Branches:
  refs/heads/master c04941d2a -> cc9334a46


[FLINK-5616] [tests] Harden 
YarnIntraNonHaMasterServicesTest.testClosingReportsToLeader

Prevent the race condition between shutting down the leader election service 
and the
contender becoming the leader by waiting on a LeaderRetrievalListener. The 
problem
was that if the service has been shut down before the contender has become the 
leader,
then the contender would not be notified about the shut down via the 
handleError method.

At correct waiting behaviour using Mockito's verify statement

This closes #3327.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc9334a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc9334a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc9334a4

Branch: refs/heads/master
Commit: cc9334a4694b06abde2723548f9576256ae063db
Parents: c04941d
Author: Till Rohrmann <[email protected]>
Authored: Wed Feb 15 18:15:50 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Feb 17 13:52:56 2017 +0100

----------------------------------------------------------------------
 .../leaderelection/SingleLeaderElectionService.java |  2 ++
 .../YarnIntraNonHaMasterServicesTest.java           | 16 ++++++++++++++--
 2 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cc9334a4/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
index 26e3cbf..96e1390 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
@@ -92,6 +92,8 @@ public class SingleLeaderElectionService implements 
LeaderElectionService {
                this.notificationExecutor = 
checkNotNull(notificationsDispatcher);
                this.leaderId = checkNotNull(leaderId);
                this.listeners = new HashSet<>();
+
+               shutdown = false;
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cc9334a4/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
 
b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
index 0e7bf0f..64c22d2 100644
--- 
a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
+++ 
b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
@@ -22,8 +22,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.StringUtils;
 
+import org.apache.flink.util.TestLogger;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
 import org.junit.AfterClass;
@@ -41,13 +44,14 @@ import java.util.Random;
 import java.util.UUID;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-public class YarnIntraNonHaMasterServicesTest {
+public class YarnIntraNonHaMasterServicesTest extends TestLogger {
 
        private static final Random RND = new Random();
 
@@ -112,12 +116,20 @@ public class YarnIntraNonHaMasterServicesTest {
 
                try (YarnHighAvailabilityServices services = new 
YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig)) {
                        final LeaderElectionService elector = 
services.getResourceManagerLeaderElectionService();
+                       final LeaderRetrievalService retrieval = 
services.getResourceManagerLeaderRetriever();
                        final LeaderContender contender = 
mockContender(elector);
+                       final LeaderRetrievalListener listener = 
mock(LeaderRetrievalListener.class);
 
                        elector.start(contender);
+                       retrieval.start(listener);
+
+                       // wait until the contender has become the leader
+                       verify(listener, 
timeout(1000L).times(1)).notifyLeaderAddress(anyString(), any(UUID.class));
+
+                       // now we can close the election service
                        services.close();
 
-                       verify(contender, 
timeout(100).times(1)).handleError(any(Exception.class));
+                       verify(contender, 
timeout(1000L).times(1)).handleError(any(Exception.class));
                }
        }
 

Reply via email to