Repository: flink
Updated Branches:
  refs/heads/master 21480e29a -> 9e7c6645f


[FLINK-3693] [tests] Wait for task manager to register before submitting job

- This test could fail when the job was submitted before the task manager
  connects to the leading job manager (see [1])

[1] https://s3.amazonaws.com/archive.travis-ci.org/jobs/120072682/log.txt


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e7c6645
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e7c6645
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e7c6645

Branch: refs/heads/master
Commit: 9e7c6645f0f0e88cdfd092d72e810cd52352ca63
Parents: 21480e2
Author: Ufuk Celebi <[email protected]>
Authored: Mon Apr 4 14:33:23 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Apr 4 16:34:17 2016 +0200

----------------------------------------------------------------------
 .../test/recovery/JobManagerHAJobGraphRecoveryITCase.java | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e7c6645/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index bd83e52..2418853 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -60,6 +60,8 @@ import org.junit.Test;
 import scala.Option;
 import scala.Some;
 import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -298,6 +300,14 @@ public class JobManagerHAJobGraphRecoveryITCase extends 
TestLogger {
                                                leaderAddress, testSystem, 
deadline.timeLeft());
                                ActorGateway leader = new 
AkkaActorGateway(leaderRef, leaderId);
 
+                               int numSlots = 0;
+                               while (numSlots == 0) {
+                                       Future<?> slotsFuture = 
leader.ask(JobManagerMessages
+                                                       
.getRequestTotalNumberOfSlots(), deadline.timeLeft());
+
+                                       numSlots = (Integer) 
Await.result(slotsFuture, deadline.timeLeft());
+                               }
+
                                // Submit the job in non-detached mode
                                leader.tell(new SubmitJob(jobGraph,
                                                
ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES), client);

Reply via email to