This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 22e9357d40f7f56fcb671e1076d8d7271316fcdf Author: Dawid Wysakowicz <[email protected]> AuthorDate: Thu Apr 15 11:56:23 2021 +0200 [FLINK-22248][tests] Improve JobMasterStopWithSavepoint#waitForJob to wait for all tasks running --- .../JobMasterStopWithSavepointITCase.java | 25 ++++++++++------------ 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java index 6eec106..d4740992 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Deadline; @@ -38,6 +39,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; @@ -313,20 +315,15 @@ public class JobMasterStopWithSavepointITCase extends AbstractTestBase { } private void waitForJob() throws Exception { - for (int i = 0; i < 60; i++) { - try { - final JobStatus jobStatus = - clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS); - assertThat(jobStatus.isGloballyTerminalState(), equalTo(false)); - if (jobStatus == JobStatus.RUNNING) { - return; - } - } catch (ExecutionException ignored) { - // JobManagerRunner is not yet registered in Dispatcher - } - Thread.sleep(1000); - } - throw new AssertionError("Job did not become running within timeout."); + Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5)); + JobID jobID = jobGraph.getJobID(); + CommonTestUtils.waitForAllTaskRunning( + () -> + miniClusterResource + .getMiniCluster() + .getExecutionGraph(jobID) + .get(60, TimeUnit.SECONDS), + deadline); } /**
