This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 378d3ca0d4b487b1ebc9354e9ebe8952cc3a9d11 Author: David Moravek <[email protected]> AuthorDate: Wed Apr 19 08:45:38 2023 +0200 [FLINK-31723] Refactor DispatcherTest#testCancellationDuringInitialization to use Assertj matchers and to make it more explicit what it actually tests. --- .../flink/runtime/dispatcher/DispatcherTest.java | 55 ++++++++++++---------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index cd50911ff1e..ff86aa91baa 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -366,37 +366,40 @@ public class DispatcherTest extends AbstractDispatcherTest { @Test public void testCancellationDuringInitialization() throws Exception { + final Tuple2<JobGraph, JobVertexBlockingOnInitializeOnJobMaster> blockingJobGraphAndVertex = getJobGraphAndVertexBlockingOnInitializeOnJobMaster(); + final JobGraph blockingJobGraph = blockingJobGraphAndVertex.f0; + final JobVertexBlockingOnInitializeOnJobMaster blockingVertex = blockingJobGraphAndVertex.f1; + dispatcher = createAndStartDispatcher( heartbeatServices, haServices, - new ExpectedJobIdJobManagerRunnerFactory(jobId)); + new ExpectedJobIdJobManagerRunnerFactory(blockingJobGraph.getJobID())); jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); - DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - - // create a job graph of a job that blocks forever - Tuple2<JobGraph, BlockingJobVertex> blockingJobGraph = getBlockingJobGraphAndVertex(); - JobID jobID = blockingJobGraph.f0.getJobID(); + final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); - dispatcherGateway.submitJob(blockingJobGraph.f0, TIMEOUT).get(); - - assertThat( - dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get(), - is(JobStatus.INITIALIZING)); + assertThatFuture(dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT)).eventuallySucceeds(); + assertThatFuture( + dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT)) + .eventuallySucceeds() + .isEqualTo(JobStatus.INITIALIZING); // submission has succeeded, now cancel the job - CompletableFuture<Acknowledge> cancellationFuture = - dispatcherGateway.cancelJob(jobID, TIMEOUT); - assertThat( - dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get(), is(JobStatus.CANCELLING)); - assertThat(cancellationFuture.isDone(), is(false)); - // unblock - blockingJobGraph.f1.unblock(); + final CompletableFuture<Acknowledge> cancellationFuture = + dispatcherGateway.cancelJob(blockingJobGraph.getJobID(), TIMEOUT); + assertThatFuture( + dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT)).eventuallySucceeds().isEqualTo(JobStatus.CANCELLING); + assertThatFuture(cancellationFuture).isNotDone(); + + // unblock initialization + blockingVertex.unblock(); // wait until cancelled - cancellationFuture.get(); - assertThat( - dispatcherGateway.requestJobResult(jobID, TIMEOUT).get().getApplicationStatus(), - is(ApplicationStatus.CANCELED)); + assertThatFuture(cancellationFuture).eventuallySucceeds(); + + assertThatFuture( + dispatcherGateway.requestJobResult(blockingJobGraph.getJobID(), TIMEOUT)).eventuallySucceeds().extracting( + JobResult::getApplicationStatus) + .isEqualTo(ApplicationStatus.CANCELED); } @Test @@ -1608,8 +1611,8 @@ public class DispatcherTest extends AbstractDispatcherTest { } } - private Tuple2<JobGraph, BlockingJobVertex> getBlockingJobGraphAndVertex() { - final BlockingJobVertex blockingJobVertex = new BlockingJobVertex("testVertex"); + private Tuple2<JobGraph, JobVertexBlockingOnInitializeOnJobMaster> getJobGraphAndVertexBlockingOnInitializeOnJobMaster() { + final JobVertexBlockingOnInitializeOnJobMaster blockingJobVertex = new JobVertexBlockingOnInitializeOnJobMaster("testVertex"); blockingJobVertex.setInvokableClass(NoOpInvokable.class); // AdaptiveScheduler expects the parallelism to be set for each vertex blockingJobVertex.setParallelism(1); @@ -1715,10 +1718,10 @@ public class DispatcherTest extends AbstractDispatcherTest { } } - private static class BlockingJobVertex extends JobVertex { + private static class JobVertexBlockingOnInitializeOnJobMaster extends JobVertex { private final OneShotLatch oneShotLatch = new OneShotLatch(); - private BlockingJobVertex(String name) { + private JobVertexBlockingOnInitializeOnJobMaster(String name) { super(name); }
