This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 52bf14b0ba949e048c78862be2ed8ebfb58c780e Author: David Moravek <[email protected]> AuthorDate: Thu Apr 20 13:35:27 2023 +0200 [FLINK-31723] Fix DispatcherTest#testCancellationDuringInitialization to not make assumptions about an underlying scheduler implementation. --- .../flink/runtime/dispatcher/DispatcherTest.java | 170 ++++++++++++++------- .../DefaultJobMasterServiceProcessTest.java | 2 +- .../factories/TestingJobMasterServiceFactory.java | 14 +- 3 files changed, 120 insertions(+), 66 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index ff86aa91baa..3bd1821f44b 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.operators.ResourceSpec; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.PipelineOptions; @@ -61,6 +60,7 @@ import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner; import org.apache.flink.runtime.jobmaster.TestingJobMasterService; import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceProcessFactory; import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory; +import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory; import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory; import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; @@ -366,40 +366,36 @@ public class DispatcherTest extends AbstractDispatcherTest { @Test public void testCancellationDuringInitialization() throws Exception { - final Tuple2<JobGraph, JobVertexBlockingOnInitializeOnJobMaster> blockingJobGraphAndVertex = getJobGraphAndVertexBlockingOnInitializeOnJobMaster(); - final JobGraph blockingJobGraph = blockingJobGraphAndVertex.f0; - final JobVertexBlockingOnInitializeOnJobMaster blockingVertex = blockingJobGraphAndVertex.f1; + final CancellableJobManagerRunnerWithInitializedJobFactory runnerFactory = + new CancellableJobManagerRunnerWithInitializedJobFactory(jobId); + dispatcher = createAndStartDispatcher(heartbeatServices, haServices, runnerFactory); - dispatcher = - createAndStartDispatcher( - heartbeatServices, - haServices, - new ExpectedJobIdJobManagerRunnerFactory(blockingJobGraph.getJobID())); jobMasterLeaderElectionService.isLeader(UUID.randomUUID()); - final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); + final DispatcherGateway dispatcherGateway = + dispatcher.getSelfGateway(DispatcherGateway.class); - assertThatFuture(dispatcherGateway.submitJob(blockingJobGraph, TIMEOUT)).eventuallySucceeds(); - assertThatFuture( - dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT)) + assertThatFuture(dispatcherGateway.submitJob(jobGraph, TIMEOUT)).eventuallySucceeds(); + + assertThatFuture(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT)) .eventuallySucceeds() .isEqualTo(JobStatus.INITIALIZING); // submission has succeeded, now cancel the job - final CompletableFuture<Acknowledge> cancellationFuture = - dispatcherGateway.cancelJob(blockingJobGraph.getJobID(), TIMEOUT); - assertThatFuture( - dispatcherGateway.requestJobStatus(blockingJobGraph.getJobID(), TIMEOUT)).eventuallySucceeds().isEqualTo(JobStatus.CANCELLING); - assertThatFuture(cancellationFuture).isNotDone(); + final CompletableFuture<Acknowledge> cancellationRequestFuture = + dispatcherGateway.cancelJob(jobGraph.getJobID(), TIMEOUT); + assertThatFuture(dispatcherGateway.requestJobStatus(jobGraph.getJobID(), TIMEOUT)) + .eventuallySucceeds() + .isEqualTo(JobStatus.CANCELLING); + assertThatFuture(cancellationRequestFuture).isNotDone(); - // unblock initialization - blockingVertex.unblock(); - // wait until cancelled - assertThatFuture(cancellationFuture).eventuallySucceeds(); + // unblock the job cancellation + runnerFactory.unblockCancellation(); + assertThatFuture(cancellationRequestFuture).eventuallySucceeds(); - assertThatFuture( - dispatcherGateway.requestJobResult(blockingJobGraph.getJobID(), TIMEOUT)).eventuallySucceeds().extracting( - JobResult::getApplicationStatus) - .isEqualTo(ApplicationStatus.CANCELED); + assertThatFuture(dispatcherGateway.requestJobResult(jobGraph.getJobID(), TIMEOUT)) + .eventuallySucceeds() + .extracting(JobResult::getApplicationStatus) + .isEqualTo(ApplicationStatus.CANCELED); } @Test @@ -1428,6 +1424,94 @@ public class DispatcherTest extends AbstractDispatcherTest { jobGraph))))); } + private static class CancellableJobManagerRunnerWithInitializedJobFactory + implements JobManagerRunnerFactory { + + private final JobID expectedJobId; + + private final AtomicReference<JobStatus> jobStatus = + new AtomicReference(JobStatus.INITIALIZING); + private final CompletableFuture<Void> cancellationFuture = new CompletableFuture<>(); + + private CancellableJobManagerRunnerWithInitializedJobFactory(JobID expectedJobId) { + this.expectedJobId = expectedJobId; + } + + @Override + public JobManagerRunner createJobManagerRunner( + JobGraph jobGraph, + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + JobManagerSharedServices jobManagerServices, + JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, + FatalErrorHandler fatalErrorHandler, + long initializationTimestamp) + throws Exception { + assertEquals(expectedJobId, jobGraph.getJobID()); + final JobMasterGateway jobMasterGateway = + new TestingJobMasterGatewayBuilder() + .setRequestJobSupplier( + () -> { + final ExecutionGraphInfo executionGraphInfo = + new ExecutionGraphInfo( + new ArchivedExecutionGraphBuilder() + .setState(jobStatus.get()) + .build()); + return CompletableFuture.completedFuture( + executionGraphInfo); + }) + .setCancelFunction( + () -> { + jobStatus.set(JobStatus.CANCELLING); + return cancellationFuture.thenApply( + ignored -> { + jobStatus.set(JobStatus.CANCELED); + return Acknowledge.get(); + }); + }) + .build(); + + final JobMasterServiceFactory jobMasterServiceFactory = + new TestingJobMasterServiceFactory( + onCompletionActions -> { + final TestingJobMasterService jobMasterService = + new TestingJobMasterService(jobMasterGateway); + cancellationFuture.thenRun( + () -> + onCompletionActions.jobReachedGloballyTerminalState( + new ExecutionGraphInfo( + new ArchivedExecutionGraphBuilder() + .setJobID( + jobGraph.getJobID()) + .setState( + JobStatus.CANCELED) + .build()))); + return CompletableFuture.completedFuture(jobMasterService); + }); + + return new JobMasterServiceLeadershipRunner( + new DefaultJobMasterServiceProcessFactory( + jobGraph.getJobID(), + jobGraph.getName(), + jobGraph.getCheckpointingSettings(), + initializationTimestamp, + jobMasterServiceFactory), + highAvailabilityServices.getJobManagerLeaderElectionService( + jobGraph.getJobID()), + highAvailabilityServices.getJobResultStore(), + jobManagerServices + .getLibraryCacheManager() + .registerClassLoaderLease(jobGraph.getJobID()), + fatalErrorHandler); + } + + public void unblockCancellation() { + cancellationFuture.complete(null); + } + } + private static class JobManagerRunnerWithBlockingJobMasterFactory implements JobManagerRunnerFactory { @@ -1479,7 +1563,7 @@ public class DispatcherTest extends AbstractDispatcherTest { jobGraph.getCheckpointingSettings(), initializationTimestamp, new TestingJobMasterServiceFactory( - () -> { + ignored -> { initLatch.trigger(); final CompletableFuture<JobMasterService> result = new CompletableFuture<>(); @@ -1611,20 +1695,6 @@ public class DispatcherTest extends AbstractDispatcherTest { } } - private Tuple2<JobGraph, JobVertexBlockingOnInitializeOnJobMaster> getJobGraphAndVertexBlockingOnInitializeOnJobMaster() { - final JobVertexBlockingOnInitializeOnJobMaster blockingJobVertex = new JobVertexBlockingOnInitializeOnJobMaster("testVertex"); - blockingJobVertex.setInvokableClass(NoOpInvokable.class); - // AdaptiveScheduler expects the parallelism to be set for each vertex - blockingJobVertex.setParallelism(1); - - return Tuple2.of( - JobGraphBuilder.newStreamingJobGraphBuilder() - .setJobId(jobId) - .addJobVertex(blockingJobVertex) - .build(), - blockingJobVertex); - } - private static final class ExpectedJobIdJobManagerRunnerFactory implements JobManagerRunnerFactory { @@ -1717,22 +1787,4 @@ public class DispatcherTest extends AbstractDispatcherTest { return runner; } } - - private static class JobVertexBlockingOnInitializeOnJobMaster extends JobVertex { - private final OneShotLatch oneShotLatch = new OneShotLatch(); - - private JobVertexBlockingOnInitializeOnJobMaster(String name) { - super(name); - } - - @Override - public void initializeOnMaster(InitializeOnMasterContext context) throws Exception { - super.initializeOnMaster(context); - oneShotLatch.await(); - } - - public void unblock() { - oneShotLatch.trigger(); - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java index c21c8e31488..72ff42bf8c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java @@ -274,7 +274,7 @@ class DefaultJobMasterServiceProcessTest { return new DefaultJobMasterServiceProcess( jobId, UUID.randomUUID(), - new TestingJobMasterServiceFactory(() -> jobMasterServiceFuture), + new TestingJobMasterServiceFactory(ignored -> jobMasterServiceFuture), failedArchivedExecutionGraphFactory); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java index a690cebf236..6c7e54f3ab2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/factories/TestingJobMasterServiceFactory.java @@ -24,24 +24,26 @@ import org.apache.flink.runtime.jobmaster.TestingJobMasterService; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.function.Supplier; +import java.util.function.Function; /** Testing implementation of the {@link JobMasterServiceFactory}. */ public class TestingJobMasterServiceFactory implements JobMasterServiceFactory { - private final Supplier<CompletableFuture<JobMasterService>> jobMasterServiceSupplier; + private final Function<OnCompletionActions, CompletableFuture<JobMasterService>> + jobMasterServiceFunction; public TestingJobMasterServiceFactory( - Supplier<CompletableFuture<JobMasterService>> jobMasterServiceSupplier) { - this.jobMasterServiceSupplier = jobMasterServiceSupplier; + Function<OnCompletionActions, CompletableFuture<JobMasterService>> + jobMasterServiceFunction) { + this.jobMasterServiceFunction = jobMasterServiceFunction; } public TestingJobMasterServiceFactory() { - this(() -> CompletableFuture.completedFuture(new TestingJobMasterService())); + this(ignored -> CompletableFuture.completedFuture(new TestingJobMasterService())); } @Override public CompletableFuture<JobMasterService> createJobMasterService( UUID leaderSessionId, OnCompletionActions onCompletionActions) { - return jobMasterServiceSupplier.get(); + return jobMasterServiceFunction.apply(onCompletionActions); } }
