This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 378d3ca0d4b487b1ebc9354e9ebe8952cc3a9d11
Author: David Moravek <[email protected]>
AuthorDate: Wed Apr 19 08:45:38 2023 +0200

    [FLINK-31723] Refactor DispatcherTest#testCancellationDuringInitialization 
to use Assertj matchers and to make it more explicit  what it actually tests.
---
 .../flink/runtime/dispatcher/DispatcherTest.java   | 55 ++++++++++++----------
 1 file changed, 29 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index cd50911ff1e..ff86aa91baa 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -366,37 +366,40 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
 
     @Test
     public void testCancellationDuringInitialization() throws Exception {
+        final Tuple2<JobGraph, JobVertexBlockingOnInitializeOnJobMaster> 
blockingJobGraphAndVertex = 
getJobGraphAndVertexBlockingOnInitializeOnJobMaster();
+        final JobGraph blockingJobGraph = blockingJobGraphAndVertex.f0;
+        final JobVertexBlockingOnInitializeOnJobMaster blockingVertex = 
blockingJobGraphAndVertex.f1;
+
         dispatcher =
                 createAndStartDispatcher(
                         heartbeatServices,
                         haServices,
-                        new ExpectedJobIdJobManagerRunnerFactory(jobId));
+                        new 
ExpectedJobIdJobManagerRunnerFactory(blockingJobGraph.getJobID()));
         jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
-        DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
-
-        // create a job graph of a job that blocks forever
-        Tuple2<JobGraph, BlockingJobVertex> blockingJobGraph = 
getBlockingJobGraphAndVertex();
-        JobID jobID = blockingJobGraph.f0.getJobID();
+        final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
 
-        dispatcherGateway.submitJob(blockingJobGraph.f0, TIMEOUT).get();
-
-        assertThat(
-                dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get(),
-                is(JobStatus.INITIALIZING));
+        assertThatFuture(dispatcherGateway.submitJob(blockingJobGraph, 
TIMEOUT)).eventuallySucceeds();
+        assertThatFuture(
+                
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT))
+                .eventuallySucceeds()
+                .isEqualTo(JobStatus.INITIALIZING);
 
         // submission has succeeded, now cancel the job
-        CompletableFuture<Acknowledge> cancellationFuture =
-                dispatcherGateway.cancelJob(jobID, TIMEOUT);
-        assertThat(
-                dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get(), 
is(JobStatus.CANCELLING));
-        assertThat(cancellationFuture.isDone(), is(false));
-        // unblock
-        blockingJobGraph.f1.unblock();
+        final CompletableFuture<Acknowledge> cancellationFuture =
+                dispatcherGateway.cancelJob(blockingJobGraph.getJobID(), 
TIMEOUT);
+        assertThatFuture(
+                
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), 
TIMEOUT)).eventuallySucceeds().isEqualTo(JobStatus.CANCELLING);
+        assertThatFuture(cancellationFuture).isNotDone();
+
+        // unblock initialization
+        blockingVertex.unblock();
         // wait until cancelled
-        cancellationFuture.get();
-        assertThat(
-                dispatcherGateway.requestJobResult(jobID, 
TIMEOUT).get().getApplicationStatus(),
-                is(ApplicationStatus.CANCELED));
+        assertThatFuture(cancellationFuture).eventuallySucceeds();
+
+        assertThatFuture(
+                
dispatcherGateway.requestJobResult(blockingJobGraph.getJobID(), 
TIMEOUT)).eventuallySucceeds().extracting(
+                        JobResult::getApplicationStatus)
+                        .isEqualTo(ApplicationStatus.CANCELED);
     }
 
     @Test
@@ -1608,8 +1611,8 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         }
     }
 
-    private Tuple2<JobGraph, BlockingJobVertex> getBlockingJobGraphAndVertex() 
{
-        final BlockingJobVertex blockingJobVertex = new 
BlockingJobVertex("testVertex");
+    private Tuple2<JobGraph, JobVertexBlockingOnInitializeOnJobMaster> 
getJobGraphAndVertexBlockingOnInitializeOnJobMaster() {
+        final JobVertexBlockingOnInitializeOnJobMaster blockingJobVertex = new 
JobVertexBlockingOnInitializeOnJobMaster("testVertex");
         blockingJobVertex.setInvokableClass(NoOpInvokable.class);
         // AdaptiveScheduler expects the parallelism to be set for each vertex
         blockingJobVertex.setParallelism(1);
@@ -1715,10 +1718,10 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         }
     }
 
-    private static class BlockingJobVertex extends JobVertex {
+    private static class JobVertexBlockingOnInitializeOnJobMaster extends 
JobVertex {
         private final OneShotLatch oneShotLatch = new OneShotLatch();
 
-        private BlockingJobVertex(String name) {
+        private JobVertexBlockingOnInitializeOnJobMaster(String name) {
             super(name);
         }
 

Reply via email to