Repository: flink Updated Branches: refs/heads/master c04941d2a -> cc9334a46
[FLINK-5616] [tests] Harden YarnIntraNonHaMasterServicesTest.testClosingReportsToLeader Prevent the race condition between shutting down the leader election service and the contender becoming the leader by waiting on a LeaderRetrievalListener. The problem was that if the service has been shut down before the contender has become the leader, then the contender would not be notified about the shut down via the handleError method. At correct waiting behaviour using Mockito's verify statement This closes #3327. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc9334a4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc9334a4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc9334a4 Branch: refs/heads/master Commit: cc9334a4694b06abde2723548f9576256ae063db Parents: c04941d Author: Till Rohrmann <[email protected]> Authored: Wed Feb 15 18:15:50 2017 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Feb 17 13:52:56 2017 +0100 ---------------------------------------------------------------------- .../leaderelection/SingleLeaderElectionService.java | 2 ++ .../YarnIntraNonHaMasterServicesTest.java | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cc9334a4/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java index 26e3cbf..96e1390 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java @@ -92,6 +92,8 @@ public class SingleLeaderElectionService implements LeaderElectionService { this.notificationExecutor = checkNotNull(notificationsDispatcher); this.leaderId = checkNotNull(leaderId); this.listeners = new HashSet<>(); + + shutdown = false; } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/cc9334a4/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java index 0e7bf0f..64c22d2 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java @@ -22,8 +22,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.util.StringUtils; +import org.apache.flink.util.TestLogger; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.AfterClass; @@ -41,13 +44,14 @@ import java.util.Random; import java.util.UUID; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class YarnIntraNonHaMasterServicesTest { +public class YarnIntraNonHaMasterServicesTest extends TestLogger { private static final Random RND = new Random(); @@ -112,12 +116,20 @@ public class YarnIntraNonHaMasterServicesTest { try (YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig)) { final LeaderElectionService elector = services.getResourceManagerLeaderElectionService(); + final LeaderRetrievalService retrieval = services.getResourceManagerLeaderRetriever(); final LeaderContender contender = mockContender(elector); + final LeaderRetrievalListener listener = mock(LeaderRetrievalListener.class); elector.start(contender); + retrieval.start(listener); + + // wait until the contender has become the leader + verify(listener, timeout(1000L).times(1)).notifyLeaderAddress(anyString(), any(UUID.class)); + + // now we can close the election service services.close(); - verify(contender, timeout(100).times(1)).handleError(any(Exception.class)); + verify(contender, timeout(1000L).times(1)).handleError(any(Exception.class)); } }
