This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 52bf14b0ba949e048c78862be2ed8ebfb58c780e
Author: David Moravek <[email protected]>
AuthorDate: Thu Apr 20 13:35:27 2023 +0200

    [FLINK-31723] Fix DispatcherTest#testCancellationDuringInitialization to 
not make assumptions about an underlying scheduler implementation.
---
 .../flink/runtime/dispatcher/DispatcherTest.java   | 170 ++++++++++++++-------
 .../DefaultJobMasterServiceProcessTest.java        |   2 +-
 .../factories/TestingJobMasterServiceFactory.java  |  14 +-
 3 files changed, 120 insertions(+), 66 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index ff86aa91baa..3bd1821f44b 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.operators.ResourceSpec;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.PipelineOptions;
@@ -61,6 +60,7 @@ import 
org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
 import org.apache.flink.runtime.jobmaster.TestingJobMasterService;
 import 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceProcessFactory;
 import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
+import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
 import 
org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
 import 
org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
@@ -366,40 +366,36 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
 
     @Test
     public void testCancellationDuringInitialization() throws Exception {
-        final Tuple2<JobGraph, JobVertexBlockingOnInitializeOnJobMaster> 
blockingJobGraphAndVertex = 
getJobGraphAndVertexBlockingOnInitializeOnJobMaster();
-        final JobGraph blockingJobGraph = blockingJobGraphAndVertex.f0;
-        final JobVertexBlockingOnInitializeOnJobMaster blockingVertex = 
blockingJobGraphAndVertex.f1;
+        final CancellableJobManagerRunnerWithInitializedJobFactory 
runnerFactory =
+                new 
CancellableJobManagerRunnerWithInitializedJobFactory(jobId);
+        dispatcher = createAndStartDispatcher(heartbeatServices, haServices, 
runnerFactory);
 
-        dispatcher =
-                createAndStartDispatcher(
-                        heartbeatServices,
-                        haServices,
-                        new 
ExpectedJobIdJobManagerRunnerFactory(blockingJobGraph.getJobID()));
         jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
-        final DispatcherGateway dispatcherGateway = 
dispatcher.getSelfGateway(DispatcherGateway.class);
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
 
-        assertThatFuture(dispatcherGateway.submitJob(blockingJobGraph, 
TIMEOUT)).eventuallySucceeds();
-        assertThatFuture(
-                
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT))
+        assertThatFuture(dispatcherGateway.submitJob(jobGraph, 
TIMEOUT)).eventuallySucceeds();
+
+        
assertThatFuture(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), 
TIMEOUT))
                 .eventuallySucceeds()
                 .isEqualTo(JobStatus.INITIALIZING);
 
         // submission has succeeded, now cancel the job
-        final CompletableFuture<Acknowledge> cancellationFuture =
-                dispatcherGateway.cancelJob(blockingJobGraph.getJobID(), 
TIMEOUT);
-        assertThatFuture(
-                
dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), 
TIMEOUT)).eventuallySucceeds().isEqualTo(JobStatus.CANCELLING);
-        assertThatFuture(cancellationFuture).isNotDone();
+        final CompletableFuture<Acknowledge> cancellationRequestFuture =
+                dispatcherGateway.cancelJob(jobGraph.getJobID(), TIMEOUT);
+        
assertThatFuture(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), 
TIMEOUT))
+                .eventuallySucceeds()
+                .isEqualTo(JobStatus.CANCELLING);
+        assertThatFuture(cancellationRequestFuture).isNotDone();
 
-        // unblock initialization
-        blockingVertex.unblock();
-        // wait until cancelled
-        assertThatFuture(cancellationFuture).eventuallySucceeds();
+        // unblock the job cancellation
+        runnerFactory.unblockCancellation();
+        assertThatFuture(cancellationRequestFuture).eventuallySucceeds();
 
-        assertThatFuture(
-                
dispatcherGateway.requestJobResult(blockingJobGraph.getJobID(), 
TIMEOUT)).eventuallySucceeds().extracting(
-                        JobResult::getApplicationStatus)
-                        .isEqualTo(ApplicationStatus.CANCELED);
+        
assertThatFuture(dispatcherGateway.requestJobResult(jobGraph.getJobID(), 
TIMEOUT))
+                .eventuallySucceeds()
+                .extracting(JobResult::getApplicationStatus)
+                .isEqualTo(ApplicationStatus.CANCELED);
     }
 
     @Test
@@ -1428,6 +1424,94 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                                                                 jobGraph)))));
     }
 
+    private static class CancellableJobManagerRunnerWithInitializedJobFactory
+            implements JobManagerRunnerFactory {
+
+        private final JobID expectedJobId;
+
+        private final AtomicReference<JobStatus> jobStatus =
+                new AtomicReference(JobStatus.INITIALIZING);
+        private final CompletableFuture<Void> cancellationFuture = new 
CompletableFuture<>();
+
+        private CancellableJobManagerRunnerWithInitializedJobFactory(JobID 
expectedJobId) {
+            this.expectedJobId = expectedJobId;
+        }
+
+        @Override
+        public JobManagerRunner createJobManagerRunner(
+                JobGraph jobGraph,
+                Configuration configuration,
+                RpcService rpcService,
+                HighAvailabilityServices highAvailabilityServices,
+                HeartbeatServices heartbeatServices,
+                JobManagerSharedServices jobManagerServices,
+                JobManagerJobMetricGroupFactory 
jobManagerJobMetricGroupFactory,
+                FatalErrorHandler fatalErrorHandler,
+                long initializationTimestamp)
+                throws Exception {
+            assertEquals(expectedJobId, jobGraph.getJobID());
+            final JobMasterGateway jobMasterGateway =
+                    new TestingJobMasterGatewayBuilder()
+                            .setRequestJobSupplier(
+                                    () -> {
+                                        final ExecutionGraphInfo 
executionGraphInfo =
+                                                new ExecutionGraphInfo(
+                                                        new 
ArchivedExecutionGraphBuilder()
+                                                                
.setState(jobStatus.get())
+                                                                .build());
+                                        return 
CompletableFuture.completedFuture(
+                                                executionGraphInfo);
+                                    })
+                            .setCancelFunction(
+                                    () -> {
+                                        jobStatus.set(JobStatus.CANCELLING);
+                                        return cancellationFuture.thenApply(
+                                                ignored -> {
+                                                    
jobStatus.set(JobStatus.CANCELED);
+                                                    return Acknowledge.get();
+                                                });
+                                    })
+                            .build();
+
+            final JobMasterServiceFactory jobMasterServiceFactory =
+                    new TestingJobMasterServiceFactory(
+                            onCompletionActions -> {
+                                final TestingJobMasterService jobMasterService 
=
+                                        new 
TestingJobMasterService(jobMasterGateway);
+                                cancellationFuture.thenRun(
+                                        () ->
+                                                
onCompletionActions.jobReachedGloballyTerminalState(
+                                                        new ExecutionGraphInfo(
+                                                                new 
ArchivedExecutionGraphBuilder()
+                                                                        
.setJobID(
+                                                                               
 jobGraph.getJobID())
+                                                                        
.setState(
+                                                                               
 JobStatus.CANCELED)
+                                                                        
.build())));
+                                return 
CompletableFuture.completedFuture(jobMasterService);
+                            });
+
+            return new JobMasterServiceLeadershipRunner(
+                    new DefaultJobMasterServiceProcessFactory(
+                            jobGraph.getJobID(),
+                            jobGraph.getName(),
+                            jobGraph.getCheckpointingSettings(),
+                            initializationTimestamp,
+                            jobMasterServiceFactory),
+                    
highAvailabilityServices.getJobManagerLeaderElectionService(
+                            jobGraph.getJobID()),
+                    highAvailabilityServices.getJobResultStore(),
+                    jobManagerServices
+                            .getLibraryCacheManager()
+                            .registerClassLoaderLease(jobGraph.getJobID()),
+                    fatalErrorHandler);
+        }
+
+        public void unblockCancellation() {
+            cancellationFuture.complete(null);
+        }
+    }
+
     private static class JobManagerRunnerWithBlockingJobMasterFactory
             implements JobManagerRunnerFactory {
 
@@ -1479,7 +1563,7 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
                             jobGraph.getCheckpointingSettings(),
                             initializationTimestamp,
                             new TestingJobMasterServiceFactory(
-                                    () -> {
+                                    ignored -> {
                                         initLatch.trigger();
                                         final 
CompletableFuture<JobMasterService> result =
                                                 new CompletableFuture<>();
@@ -1611,20 +1695,6 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
         }
     }
 
-    private Tuple2<JobGraph, JobVertexBlockingOnInitializeOnJobMaster> 
getJobGraphAndVertexBlockingOnInitializeOnJobMaster() {
-        final JobVertexBlockingOnInitializeOnJobMaster blockingJobVertex = new 
JobVertexBlockingOnInitializeOnJobMaster("testVertex");
-        blockingJobVertex.setInvokableClass(NoOpInvokable.class);
-        // AdaptiveScheduler expects the parallelism to be set for each vertex
-        blockingJobVertex.setParallelism(1);
-
-        return Tuple2.of(
-                JobGraphBuilder.newStreamingJobGraphBuilder()
-                        .setJobId(jobId)
-                        .addJobVertex(blockingJobVertex)
-                        .build(),
-                blockingJobVertex);
-    }
-
     private static final class ExpectedJobIdJobManagerRunnerFactory
             implements JobManagerRunnerFactory {
 
@@ -1717,22 +1787,4 @@ public class DispatcherTest extends 
AbstractDispatcherTest {
             return runner;
         }
     }
-
-    private static class JobVertexBlockingOnInitializeOnJobMaster extends 
JobVertex {
-        private final OneShotLatch oneShotLatch = new OneShotLatch();
-
-        private JobVertexBlockingOnInitializeOnJobMaster(String name) {
-            super(name);
-        }
-
-        @Override
-        public void initializeOnMaster(InitializeOnMasterContext context) 
throws Exception {
-            super.initializeOnMaster(context);
-            oneShotLatch.await();
-        }
-
-        public void unblock() {
-            oneShotLatch.trigger();
-        }
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java
index c21c8e31488..72ff42bf8c9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java
@@ -274,7 +274,7 @@ class DefaultJobMasterServiceProcessTest {
         return new DefaultJobMasterServiceProcess(
                 jobId,
                 UUID.randomUUID(),
-                new TestingJobMasterServiceFactory(() -> 
jobMasterServiceFuture),
+                new TestingJobMasterServiceFactory(ignored -> 
jobMasterServiceFuture),
                 failedArchivedExecutionGraphFactory);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java
index a690cebf236..6c7e54f3ab2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java
@@ -24,24 +24,26 @@ import 
org.apache.flink.runtime.jobmaster.TestingJobMasterService;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
+import java.util.function.Function;
 
 /** Testing implementation of the {@link JobMasterServiceFactory}. */
 public class TestingJobMasterServiceFactory implements JobMasterServiceFactory 
{
-    private final Supplier<CompletableFuture<JobMasterService>> 
jobMasterServiceSupplier;
+    private final Function<OnCompletionActions, 
CompletableFuture<JobMasterService>>
+            jobMasterServiceFunction;
 
     public TestingJobMasterServiceFactory(
-            Supplier<CompletableFuture<JobMasterService>> 
jobMasterServiceSupplier) {
-        this.jobMasterServiceSupplier = jobMasterServiceSupplier;
+            Function<OnCompletionActions, CompletableFuture<JobMasterService>>
+                    jobMasterServiceFunction) {
+        this.jobMasterServiceFunction = jobMasterServiceFunction;
     }
 
     public TestingJobMasterServiceFactory() {
-        this(() -> CompletableFuture.completedFuture(new 
TestingJobMasterService()));
+        this(ignored -> CompletableFuture.completedFuture(new 
TestingJobMasterService()));
     }
 
     @Override
     public CompletableFuture<JobMasterService> createJobMasterService(
             UUID leaderSessionId, OnCompletionActions onCompletionActions) {
-        return jobMasterServiceSupplier.get();
+        return jobMasterServiceFunction.apply(onCompletionActions);
     }
 }

Reply via email to