This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 22e9357d40f7f56fcb671e1076d8d7271316fcdf
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Apr 15 11:56:23 2021 +0200

    [FLINK-22248][tests] Improve JobMasterStopWithSavepoint#waitForJob to wait 
for all tasks running
---
 .../JobMasterStopWithSavepointITCase.java          | 25 ++++++++++------------
 1 file changed, 11 insertions(+), 14 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
index 6eec106..d4740992 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Deadline;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
@@ -313,20 +315,15 @@ public class JobMasterStopWithSavepointITCase extends 
AbstractTestBase {
     }
 
     private void waitForJob() throws Exception {
-        for (int i = 0; i < 60; i++) {
-            try {
-                final JobStatus jobStatus =
-                        
clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS);
-                assertThat(jobStatus.isGloballyTerminalState(), 
equalTo(false));
-                if (jobStatus == JobStatus.RUNNING) {
-                    return;
-                }
-            } catch (ExecutionException ignored) {
-                // JobManagerRunner is not yet registered in Dispatcher
-            }
-            Thread.sleep(1000);
-        }
-        throw new AssertionError("Job did not become running within timeout.");
+        Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5));
+        JobID jobID = jobGraph.getJobID();
+        CommonTestUtils.waitForAllTaskRunning(
+                () ->
+                        miniClusterResource
+                                .getMiniCluster()
+                                .getExecutionGraph(jobID)
+                                .get(60, TimeUnit.SECONDS),
+                deadline);
     }
 
     /**

Reply via email to