This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bcf56de51eeae89bcaadb37fa09c053aa3444f1f Author: Chesnay Schepler <[email protected]> AuthorDate: Tue Jun 14 13:58:10 2022 +0200 [FLINK-28052][tests] Remove RunFailedJobListener The RunFailedJobListener had rather obscure semantics. It considered a job to be terminal after it was restarted. This is awfully specific to a particular test case. A cleaner approach is to just cancel the job and wait for it to terminate. Additionally it considered a job as running purely based on the job status, whereas, in particular when checkpointing is involved, waiting for the tasks to be submitted is a better measure. In fact, testExceptionHistoryWithTaskFailureFromStopWithSavepoint was a broken since a savepoint was never triggered, as not all tasks were running. --- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 44 ++++------------------ 1 file changed, 7 insertions(+), 37 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index ee3f211807e..28200416199 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.core.execution.SavepointFormatType; -import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; @@ -49,7 +48,6 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphTest; import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; import org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy; import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy; @@ -993,36 +991,6 @@ public class AdaptiveSchedulerTest extends TestLogger { .isFalse(); } - static class RunFailedJobListener implements JobStatusListener { - OneShotLatch jobRunning; - OneShotLatch jobTerminal; - - public RunFailedJobListener() { - this.jobRunning = new OneShotLatch(); - this.jobTerminal = new OneShotLatch(); - } - - @Override - public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) { - if (newJobStatus == JobStatus.RUNNING) { - jobRunning.trigger(); - return; - } - boolean hasRestarted = jobRunning.isTriggered() && newJobStatus == JobStatus.CREATED; - if (newJobStatus == JobStatus.FAILED || hasRestarted) { - jobTerminal.trigger(); - } - } - - public void waitForRunning() throws InterruptedException { - jobRunning.await(); - } - - public void waitForTerminal() throws InterruptedException { - jobTerminal.await(); - } - } - private Iterable<RootExceptionHistoryEntry> runExceptionHistoryTests( BiConsumer<AdaptiveScheduler, List<ExecutionAttemptID>> testLogic) throws Exception { return runExceptionHistoryTests(testLogic, ignored -> {}, ignored -> {}); @@ -1042,7 +1010,6 @@ public class AdaptiveSchedulerTest extends TestLogger { throws Exception { final JobGraph jobGraph = createJobGraph(); setupJobGraph.accept(jobGraph); - RunFailedJobListener listener = new RunFailedJobListener(); final CompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1); @@ -1062,8 +1029,7 @@ public class AdaptiveSchedulerTest extends TestLogger { .setJobMasterConfiguration(configuration) .setDeclarativeSlotPool(declarativeSlotPool) .setCheckpointRecoveryFactory(checkpointRecoveryFactory) - .setCheckpointCleaner(checkpointCleaner) - .setJobStatusListener(listener); + .setCheckpointCleaner(checkpointCleaner); setupScheduler.accept(builder); final AdaptiveScheduler scheduler = builder.build(EXECUTOR_RESOURCE.getExecutor()); @@ -1090,7 +1056,10 @@ public class AdaptiveSchedulerTest extends TestLogger { ResourceProfile.UNKNOWN, PARALLELISM)), taskManagerGateway); }); - listener.waitForRunning(); + // wait for all tasks to be deployed + // this is important because some tests trigger savepoints + // these only properly work if the deployment has been started + taskManagerGateway.waitForSubmissions(PARALLELISM, TestingUtils.infiniteDuration()); CompletableFuture<Iterable<ArchivedExecutionVertex>> vertexFuture = new CompletableFuture<>(); @@ -1113,7 +1082,8 @@ public class AdaptiveSchedulerTest extends TestLogger { singleThreadMainThreadExecutor); runTestLogicFuture.get(); - listener.waitForTerminal(); + singleThreadMainThreadExecutor.execute(scheduler::cancel); + scheduler.getJobTerminationFuture().get(); return scheduler.requestJob().getExceptionHistory(); }
