This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 919dca5fd093f8a1bf28383dbaf75677bab595b8 Author: Matthias Pohl <[email protected]> AuthorDate: Thu Feb 3 18:55:55 2022 +0100 [FLINK-25953][runtime] Reorganizes dispatcher cleanup related tests This includes introducing a TestingDispatcher.Builder and aligning the usages of the TestingDispatcher instantiation between tests. Additionally, tests were renamed, obsolete tests were removed and cleanup-related tests moved into DispatcherResourceCleanupTest. --- .../DefaultJobManagerRunnerRegistry.java | 12 +- .../flink/runtime/dispatcher/Dispatcher.java | 69 ++- .../OnMainThreadJobManagerRunnerRegistry.java | 112 ++++ .../cleanup/DispatcherResourceCleanerFactory.java | 2 +- .../runtime/dispatcher/AbstractDispatcherTest.java | 137 +---- .../DefaultJobManagerRunnerRegistryTest.java | 5 +- .../dispatcher/DispatcherFailoverITCase.java | 10 +- .../dispatcher/DispatcherResourceCleanupTest.java | 575 ++++++++++----------- .../flink/runtime/dispatcher/DispatcherTest.java | 180 +------ .../runtime/dispatcher/TestingDispatcher.java | 281 ++++++++++ .../cleanup/TestingResourceCleanerFactory.java | 132 +++++ 11 files changed, 876 insertions(+), 639 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java index 2ba54e9..2eb0b30 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.FutureUtils; @@ -37,19 +36,15 @@ import java.util.concurrent.Executor; /** * {@code DefaultJobManagerRunnerRegistry} is the default implementation of the {@link - * JobManagerRunnerRegistry} interface. All methods of this class are expected to be called from - * within the main thread. + * JobManagerRunnerRegistry} interface. */ public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry { @VisibleForTesting final Map<JobID, JobManagerRunner> jobManagerRunners; - private final ComponentMainThreadExecutor mainThreadExecutor; - public DefaultJobManagerRunnerRegistry( - int initialCapacity, ComponentMainThreadExecutor mainThreadExecutor) { + public DefaultJobManagerRunnerRegistry(int initialCapacity) { Preconditions.checkArgument(initialCapacity > 0); jobManagerRunners = new HashMap<>(initialCapacity); - this.mainThreadExecutor = mainThreadExecutor; } @Override @@ -59,7 +54,6 @@ public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry @Override public void register(JobManagerRunner jobManagerRunner) { - mainThreadExecutor.assertRunningInMainThread(); Preconditions.checkArgument( !isRegistered(jobManagerRunner.getJobID()), "A job with the ID %s is already registered.", @@ -99,7 +93,6 @@ public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry } private CompletableFuture<Void> cleanup(JobID jobId) { - mainThreadExecutor.assertRunningInMainThread(); if (isRegistered(jobId)) { try { unregister(jobId).close(); @@ -113,7 +106,6 @@ public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry @Override public JobManagerRunner unregister(JobID jobId) { - mainThreadExecutor.assertRunningInMainThread(); assertJobRegistered(jobId); return this.jobManagerRunners.remove(jobId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index f15f294..5db8412 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.dispatcher; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.operators.ResourceSpec; @@ -97,7 +98,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.Executor; import java.util.function.Function; import java.util.stream.Collectors; @@ -113,6 +113,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher public static final String DISPATCHER_NAME = "dispatcher"; + private static final int INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY = 16; + private final Configuration configuration; private final JobGraphWriter jobGraphWriter; @@ -126,7 +128,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher private final FatalErrorHandler fatalErrorHandler; - private final JobManagerRunnerRegistry jobManagerRunnerRegistry; + private final OnMainThreadJobManagerRunnerRegistry jobManagerRunnerRegistry; private final Collection<JobGraph> recoveredJobs; @@ -140,8 +142,6 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher private final HistoryServerArchivist historyServerArchivist; - private final Executor ioExecutor; - @Nullable private final String metricServiceQueryAddress; private final Map<JobID, CompletableFuture<Void>> jobManagerRunnerTerminationFutures; @@ -169,8 +169,48 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception { + this( + rpcService, + fencingToken, + recoveredJobs, + recoveredDirtyJobs, + dispatcherBootstrapFactory, + dispatcherServices, + new DefaultJobManagerRunnerRegistry(INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY)); + } + + private Dispatcher( + RpcService rpcService, + DispatcherId fencingToken, + Collection<JobGraph> recoveredJobs, + Collection<JobResult> globallyTerminatedJobs, + DispatcherBootstrapFactory dispatcherBootstrapFactory, + DispatcherServices dispatcherServices, + JobManagerRunnerRegistry jobManagerRunnerRegistry) + throws Exception { + this( + rpcService, + fencingToken, + recoveredJobs, + globallyTerminatedJobs, + dispatcherBootstrapFactory, + dispatcherServices, + jobManagerRunnerRegistry, + new DispatcherResourceCleanerFactory(jobManagerRunnerRegistry, dispatcherServices)); + } + + @VisibleForTesting + protected Dispatcher( + RpcService rpcService, + DispatcherId fencingToken, + Collection<JobGraph> recoveredJobs, + Collection<JobResult> recoveredDirtyJobs, + DispatcherBootstrapFactory dispatcherBootstrapFactory, + DispatcherServices dispatcherServices, + JobManagerRunnerRegistry jobManagerRunnerRegistry, + ResourceCleanerFactory resourceCleanerFactory) + throws Exception { super(rpcService, RpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken); - checkNotNull(dispatcherServices); assertRecoveredJobsAndDirtyJobResults(recoveredJobs, recoveredDirtyJobs); this.configuration = dispatcherServices.getConfiguration(); @@ -184,14 +224,14 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher this.jobResultStore = dispatcherServices.getJobResultStore(); this.jobManagerMetricGroup = dispatcherServices.getJobManagerMetricGroup(); this.metricServiceQueryAddress = dispatcherServices.getMetricQueryServiceAddress(); - this.ioExecutor = dispatcherServices.getIoExecutor(); this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration( configuration, blobServer, fatalErrorHandler); - jobManagerRunnerRegistry = - new DefaultJobManagerRunnerRegistry(16, this.getMainThreadExecutor()); + this.jobManagerRunnerRegistry = + new OnMainThreadJobManagerRunnerRegistry( + jobManagerRunnerRegistry, this.getMainThreadExecutor()); this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist(); @@ -199,7 +239,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory(); - this.jobManagerRunnerTerminationFutures = new HashMap<>(2); + this.jobManagerRunnerTerminationFutures = + new HashMap<>(INITIAL_JOB_MANAGER_RUNNER_REGISTRY_CAPACITY); this.shutDownFuture = new CompletableFuture<>(); @@ -209,7 +250,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher this.blobServer.retainJobs( recoveredJobs.stream().map(JobGraph::getJobID).collect(Collectors.toSet()), - ioExecutor); + dispatcherServices.getIoExecutor()); this.dispatcherCachedOperationsHandler = new DispatcherCachedOperationsHandler( @@ -217,8 +258,6 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher this::triggerSavepointAndGetLocation, this::stopWithSavepointAndGetLocation); - final ResourceCleanerFactory resourceCleanerFactory = - new DispatcherResourceCleanerFactory(jobManagerRunnerRegistry, dispatcherServices); this.localResourceCleaner = resourceCleanerFactory.createLocalResourceCleaner(this.getMainThreadExecutor()); this.globalResourceCleaner = @@ -1119,7 +1158,11 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) { jobManagerMetricGroup.gauge( - MetricNames.NUM_RUNNING_JOBS, () -> (long) jobManagerRunnerRegistry.size()); + MetricNames.NUM_RUNNING_JOBS, + // metrics can be called from anywhere and therefore, have to run without the main + // thread safeguard being triggered. For metrics, we can afford to be not 100% + // accurate + () -> (long) jobManagerRunnerRegistry.getWrappedDelegate().size()); } public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java new file mode 100644 index 0000000..4c16b42 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/OnMainThreadJobManagerRunnerRegistry.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.util.WrappingProxy; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * {@code OnMainThreadJobManagerRunnerRegistry} implements {@link JobManagerRunnerRegistry} guarding + * the passed {@code JobManagerRunnerRegistry} instance in a way that it only allows modifying + * methods to be executed on the component's main thread. + * + * @see ComponentMainThreadExecutor + */ +public class OnMainThreadJobManagerRunnerRegistry + implements JobManagerRunnerRegistry, WrappingProxy<JobManagerRunnerRegistry> { + + private final JobManagerRunnerRegistry delegate; + private final ComponentMainThreadExecutor mainThreadExecutor; + + public OnMainThreadJobManagerRunnerRegistry( + JobManagerRunnerRegistry delegate, ComponentMainThreadExecutor mainThreadExecutor) { + this.delegate = delegate; + this.mainThreadExecutor = mainThreadExecutor; + } + + @Override + public boolean isRegistered(JobID jobId) { + mainThreadExecutor.assertRunningInMainThread(); + return delegate.isRegistered(jobId); + } + + @Override + public void register(JobManagerRunner jobManagerRunner) { + mainThreadExecutor.assertRunningInMainThread(); + delegate.register(jobManagerRunner); + } + + @Override + public JobManagerRunner get(JobID jobId) { + mainThreadExecutor.assertRunningInMainThread(); + return delegate.get(jobId); + } + + @Override + public int size() { + mainThreadExecutor.assertRunningInMainThread(); + return delegate.size(); + } + + @Override + public Set<JobID> getRunningJobIds() { + mainThreadExecutor.assertRunningInMainThread(); + return delegate.getRunningJobIds(); + } + + @Override + public Collection<JobManagerRunner> getJobManagerRunners() { + mainThreadExecutor.assertRunningInMainThread(); + return delegate.getJobManagerRunners(); + } + + @Override + public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) { + mainThreadExecutor.assertRunningInMainThread(); + return delegate.globalCleanupAsync(jobId, executor); + } + + @Override + public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) { + mainThreadExecutor.assertRunningInMainThread(); + return delegate.localCleanupAsync(jobId, executor); + } + + @Override + public JobManagerRunner unregister(JobID jobId) { + mainThreadExecutor.assertRunningInMainThread(); + return delegate.unregister(jobId); + } + + /** + * Returns the delegated {@link JobManagerRunnerRegistry}. This method can be used to workaround + * the main thread safeguard. + */ + @Override + public JobManagerRunnerRegistry getWrappedDelegate() { + return this.delegate; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java index 3faa358..0957a42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactory.java @@ -62,7 +62,7 @@ public class DispatcherResourceCleanerFactory implements ResourceCleanerFactory } @VisibleForTesting - DispatcherResourceCleanerFactory( + public DispatcherResourceCleanerFactory( Executor cleanupExecutor, JobManagerRunnerRegistry jobManagerRunnerRegistry, JobGraphWriter jobGraphWriter, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java index f277e2b..75d638b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/AbstractDispatcherTest.java @@ -26,18 +26,10 @@ import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.heartbeat.HeartbeatServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.JobResultStore; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobmanager.JobGraphWriter; import org.apache.flink.runtime.jobmanager.StandaloneJobGraphStore; -import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; -import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.testutils.CommonTestUtils; @@ -53,11 +45,6 @@ import org.junit.Rule; import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; -import java.util.Collection; -import java.util.Collections; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ForkJoinPool; - /** Abstract test for the {@link Dispatcher} component. */ public class AbstractDispatcherTest extends TestLogger { @@ -116,6 +103,19 @@ public class AbstractDispatcherTest extends TestLogger { new BlobServer(configuration, temporaryFolder.newFolder(), new VoidBlobStore()); } + protected TestingDispatcher.Builder createTestingDispatcherBuilder() { + return TestingDispatcher.builder() + .setRpcService(rpcService) + .setConfiguration(configuration) + .setHeartbeatServices(heartbeatServices) + .setHighAvailabilityServices(haServices) + .setJobGraphWriter(haServices.getJobGraphStore()) + .setJobResultStore(haServices.getJobResultStore()) + .setJobManagerRunnerFactory(JobMasterServiceLeadershipRunnerFactory.INSTANCE) + .setFatalErrorHandler(testingFatalErrorHandlerResource.getFatalErrorHandler()) + .setBlobServer(blobServer); + } + @After public void tearDown() throws Exception { if (haServices != null) { @@ -129,115 +129,4 @@ public class AbstractDispatcherTest extends TestLogger { protected BlobServer getBlobServer() { return blobServer; } - - /** A convenient builder for the {@link TestingDispatcher}. */ - public class TestingDispatcherBuilder { - - private Collection<JobGraph> initialJobGraphs = Collections.emptyList(); - - private Collection<JobResult> dirtyJobResults = Collections.emptyList(); - - private DispatcherBootstrapFactory dispatcherBootstrapFactory = - (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(); - - private HeartbeatServices heartbeatServices = AbstractDispatcherTest.this.heartbeatServices; - - private HighAvailabilityServices haServices = AbstractDispatcherTest.this.haServices; - - private JobManagerRunnerFactory jobManagerRunnerFactory = - JobMasterServiceLeadershipRunnerFactory.INSTANCE; - - private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE; - - private JobResultStore jobResultStore = new EmbeddedJobResultStore(); - - private FatalErrorHandler fatalErrorHandler = - testingFatalErrorHandlerResource.getFatalErrorHandler(); - - private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE; - - TestingDispatcherBuilder setHeartbeatServices(HeartbeatServices heartbeatServices) { - this.heartbeatServices = heartbeatServices; - return this; - } - - TestingDispatcherBuilder setHaServices(HighAvailabilityServices haServices) { - this.haServices = haServices; - return this; - } - - TestingDispatcherBuilder setInitialJobGraphs(Collection<JobGraph> initialJobGraphs) { - this.initialJobGraphs = initialJobGraphs; - return this; - } - - TestingDispatcherBuilder setDirtyJobResults(Collection<JobResult> dirtyJobResults) { - this.dirtyJobResults = dirtyJobResults; - return this; - } - - TestingDispatcherBuilder setDispatcherBootstrapFactory( - DispatcherBootstrapFactory dispatcherBootstrapFactory) { - this.dispatcherBootstrapFactory = dispatcherBootstrapFactory; - return this; - } - - TestingDispatcherBuilder setJobManagerRunnerFactory( - JobManagerRunnerFactory jobManagerRunnerFactory) { - this.jobManagerRunnerFactory = jobManagerRunnerFactory; - return this; - } - - TestingDispatcherBuilder setJobGraphWriter(JobGraphWriter jobGraphWriter) { - this.jobGraphWriter = jobGraphWriter; - return this; - } - - TestingDispatcherBuilder setJobResultStore(JobResultStore jobResultStore) { - this.jobResultStore = jobResultStore; - return this; - } - - public TestingDispatcherBuilder setFatalErrorHandler(FatalErrorHandler fatalErrorHandler) { - this.fatalErrorHandler = fatalErrorHandler; - return this; - } - - public TestingDispatcherBuilder setHistoryServerArchivist( - HistoryServerArchivist historyServerArchivist) { - this.historyServerArchivist = historyServerArchivist; - return this; - } - - TestingDispatcher build() throws Exception { - TestingResourceManagerGateway resourceManagerGateway = - new TestingResourceManagerGateway(); - - final MemoryExecutionGraphInfoStore executionGraphInfoStore = - new MemoryExecutionGraphInfoStore(); - - return new TestingDispatcher( - rpcService, - DispatcherId.generate(), - initialJobGraphs, - dirtyJobResults, - dispatcherBootstrapFactory, - new DispatcherServices( - configuration, - haServices, - () -> CompletableFuture.completedFuture(resourceManagerGateway), - blobServer, - heartbeatServices, - executionGraphInfoStore, - fatalErrorHandler, - historyServerArchivist, - null, - new DispatcherOperationCaches(), - UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), - jobGraphWriter, - jobResultStore, - jobManagerRunnerFactory, - ForkJoinPool.commonPool())); - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java index 6eca873..1d4cec9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; import org.apache.flink.core.testutils.FlinkAssertions; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner; import org.apache.flink.util.FlinkException; @@ -47,9 +46,7 @@ public class DefaultJobManagerRunnerRegistryTest { @BeforeEach public void setup() { - testInstance = - new DefaultJobManagerRunnerRegistry( - 4, ComponentMainThreadExecutorServiceAdapter.forMainThread()); + testInstance = new DefaultJobManagerRunnerRegistry(4); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java index fb9ac8a..6f3bcce 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java @@ -241,13 +241,9 @@ public class DispatcherFailoverITCase extends AbstractDispatcherTest { } } final TestingDispatcher dispatcher = - new TestingDispatcherBuilder() - .setJobManagerRunnerFactory( - JobMasterServiceLeadershipRunnerFactory.INSTANCE) - .setJobGraphWriter(haServices.getJobGraphStore()) - .setJobResultStore(haServices.getJobResultStore()) - .setInitialJobGraphs(jobGraphs) - .setDirtyJobResults(haServices.getJobResultStore().getDirtyResults()) + createTestingDispatcherBuilder() + .setRecoveredJobs(jobGraphs) + .setRecoveredDirtyJobs(haServices.getJobResultStore().getDirtyResults()) .setFatalErrorHandler( fatalErrorHandler == null ? testingFatalErrorHandlerResource.getFatalErrorHandler() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index a4063b6..127f98c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -22,34 +22,27 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.testutils.FlinkMatchers; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.blob.BlobServer; -import org.apache.flink.runtime.blob.BlobStore; -import org.apache.flink.runtime.blob.PermanentBlobKey; -import org.apache.flink.runtime.blob.TestingBlobStore; +import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.blob.TestingBlobStoreBuilder; import org.apache.flink.runtime.client.DuplicateJobSubmissionException; import org.apache.flink.runtime.client.JobSubmissionException; -import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.dispatcher.cleanup.TestingResourceCleanerFactory; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.JobResultEntry; import org.apache.flink.runtime.highavailability.JobResultStore; -import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; -import org.apache.flink.runtime.jobmanager.JobGraphWriter; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; -import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner; import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; +import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -60,8 +53,10 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.Reference; import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.ThrowingRunnable; import org.hamcrest.core.IsInstanceOf; import org.junit.After; @@ -74,28 +69,23 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; -import javax.annotation.Nullable; - -import java.io.File; import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; -import java.util.Collections; import java.util.Optional; import java.util.Queue; -import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.BiFunction; +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; +import static org.apache.flink.runtime.dispatcher.AbstractDispatcherTest.awaitStatus; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** Tests the resource cleanup by the {@link Dispatcher}. */ @@ -117,30 +107,14 @@ public class DispatcherResourceCleanupTest extends TestLogger { private JobGraph jobGraph; - private Configuration configuration; - - private JobResultStore jobResultStore; - - private TestingHighAvailabilityServices highAvailabilityServices; - - private OneShotLatch clearedJobLatch; - private TestingDispatcher dispatcher; private DispatcherGateway dispatcherGateway; private BlobServer blobServer; - private PermanentBlobKey permanentBlobKey; - - private File blobFile; - - private CompletableFuture<BlobKey> storedHABlobFuture; - private CompletableFuture<JobID> deleteAllHABlobsFuture; private CompletableFuture<JobID> localCleanupFuture; private CompletableFuture<JobID> globalCleanupFuture; - private CompletableFuture<JobID> cleanupJobHADataFuture; - private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE; @BeforeClass public static void setupClass() { @@ -152,41 +126,14 @@ public class DispatcherResourceCleanupTest extends TestLogger { jobGraph = JobGraphTestUtils.singleNoOpJobGraph(); jobId = jobGraph.getJobID(); - configuration = new Configuration(); - - highAvailabilityServices = new TestingHighAvailabilityServices(); - clearedJobLatch = new OneShotLatch(); - jobResultStore = new SingleJobResultStore(jobId, clearedJobLatch); - highAvailabilityServices.setJobResultStore(jobResultStore); - cleanupJobHADataFuture = new CompletableFuture<>(); - highAvailabilityServices.setGlobalCleanupFuture(cleanupJobHADataFuture); - - storedHABlobFuture = new CompletableFuture<>(); - deleteAllHABlobsFuture = new CompletableFuture<>(); - - final TestingBlobStore testingBlobStore = - new TestingBlobStoreBuilder() - .setPutFunction( - (file, jobId, blobKey) -> storedHABlobFuture.complete(blobKey)) - .setDeleteAllFunction(deleteAllHABlobsFuture::complete) - .createTestingBlobStore(); - globalCleanupFuture = new CompletableFuture<>(); localCleanupFuture = new CompletableFuture<>(); blobServer = - new TestingBlobServer( - configuration, - temporaryFolder.newFolder(), - testingBlobStore, - (jobId, ignoredExecutor) -> { - globalCleanupFuture.complete(jobId); - return FutureUtils.completedVoidFuture(); - }, - (jobId, ignoredExecutor) -> { - localCleanupFuture.complete(jobId); - return FutureUtils.completedVoidFuture(); - }); + BlobUtils.createBlobServer( + new Configuration(), + Reference.owned(temporaryFolder.newFolder()), + new TestingBlobStoreBuilder().createTestingBlobStore()); } private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob() throws Exception { @@ -195,54 +142,72 @@ public class DispatcherResourceCleanupTest extends TestLogger { private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob( int numBlockingJobManagerRunners) throws Exception { + return startDispatcherAndSubmitJob( + createTestingDispatcherBuilder(), numBlockingJobManagerRunners); + } + + private TestingJobManagerRunnerFactory startDispatcherAndSubmitJob( + TestingDispatcher.Builder dispatcherBuilder, int numBlockingJobManagerRunners) + throws Exception { final TestingJobManagerRunnerFactory testingJobManagerRunnerFactoryNG = new TestingJobManagerRunnerFactory(numBlockingJobManagerRunners); - startDispatcher(testingJobManagerRunnerFactoryNG); + startDispatcher(dispatcherBuilder, testingJobManagerRunnerFactoryNG); submitJobAndWait(); return testingJobManagerRunnerFactoryNG; } private void startDispatcher(JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception { - TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); - final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); - final MemoryExecutionGraphInfoStore archivedExecutionGraphStore = - new MemoryExecutionGraphInfoStore(); - dispatcher = - new TestingDispatcher( - rpcService, - DispatcherId.generate(), - Collections.emptyList(), - Collections.emptyList(), - (dispatcher, scheduledExecutor, errorHandler) -> - new NoOpDispatcherBootstrap(), - new DispatcherServices( - configuration, - highAvailabilityServices, - () -> CompletableFuture.completedFuture(resourceManagerGateway), - blobServer, - heartbeatServices, - archivedExecutionGraphStore, - testingFatalErrorHandlerResource.getFatalErrorHandler(), - VoidHistoryServerArchivist.INSTANCE, - null, - new DispatcherOperationCaches(), - UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), - jobGraphWriter, - jobResultStore, - jobManagerRunnerFactory, - ForkJoinPool.commonPool())); + startDispatcher(createTestingDispatcherBuilder(), jobManagerRunnerFactory); + } + + private void startDispatcher( + TestingDispatcher.Builder dispatcherBuilder, + JobManagerRunnerFactory jobManagerRunnerFactory) + throws Exception { + dispatcher = dispatcherBuilder.setJobManagerRunnerFactory(jobManagerRunnerFactory).build(); dispatcher.start(); dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class); } + private TestingDispatcher.Builder createTestingDispatcherBuilder() { + final JobManagerRunnerRegistry jobManagerRunnerRegistry = + new DefaultJobManagerRunnerRegistry(2); + return TestingDispatcher.builder() + .setRpcService(rpcService) + .setBlobServer(blobServer) + .setJobManagerRunnerRegistry(jobManagerRunnerRegistry) + .setFatalErrorHandler(testingFatalErrorHandlerResource.getFatalErrorHandler()) + .setResourceCleanerFactory( + TestingResourceCleanerFactory.builder() + // JobManagerRunnerRegistry needs to be added explicitly + // because cleaning it will trigger the closeAsync latch + // provided by TestingJobManagerRunner + .withLocallyCleanableResource(jobManagerRunnerRegistry) + .withGloballyCleanableResource( + (jobId, ignoredExecutor) -> { + globalCleanupFuture.complete(jobId); + return FutureUtils.completedVoidFuture(); + }) + .withLocallyCleanableResource( + (jobId, ignoredExecutor) -> { + localCleanupFuture.complete(jobId); + return FutureUtils.completedVoidFuture(); + }) + .build()); + } + @After public void teardown() throws Exception { if (dispatcher != null) { dispatcher.close(); } + + if (blobServer != null) { + blobServer.close(); + } } @AfterClass @@ -253,41 +218,29 @@ public class DispatcherResourceCleanupTest extends TestLogger { } @Test - public void testBlobServerCleanupWhenJobFinished() throws Exception { + public void testGlobalCleanupWhenJobFinished() throws Exception { final TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob(); // complete the job finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner()); - assertThatHABlobsHaveBeenRemoved(); + assertGlobalCleanupTriggered(jobId); } - private void assertThatHABlobsHaveBeenRemoved() - throws InterruptedException, ExecutionException, TimeoutException { - assertGlobalCleanupTriggered(jobId); + @Test + public void testGlobalCleanupWhenJobCanceled() throws Exception { + final TestingJobManagerRunnerFactory jobManagerRunnerFactory = + startDispatcherAndSubmitJob(); - // verify that we also cleared the BlobStore - assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId)); + // complete the job + cancelJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner()); - assertThat(blobFile.exists(), is(false)); + assertGlobalCleanupTriggered(jobId); } private CompletableFuture<Acknowledge> submitJob() { - try { - // upload a blob to the blob server - permanentBlobKey = blobServer.putPermanent(jobId, new byte[256]); - jobGraph.addUserJarBlobKey(permanentBlobKey); - blobFile = blobServer.getStorageLocation(jobId, permanentBlobKey); - - assertThat(blobFile.exists(), is(true)); - - // verify that we stored the blob also in the BlobStore - assertThat(storedHABlobFuture.join(), equalTo(permanentBlobKey)); - return dispatcherGateway.submitJob(jobGraph, timeout); - } catch (IOException ioe) { - return FutureUtils.completedExceptionally(ioe); - } + return dispatcherGateway.submitJob(jobGraph, timeout); } private void submitJobAndWait() { @@ -295,7 +248,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { } @Test - public void testBlobServerCleanupWhenJobNotFinished() throws Exception { + public void testLocalCleanupWhenJobNotFinished() throws Exception { final TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob(); @@ -305,22 +258,10 @@ public class DispatcherResourceCleanupTest extends TestLogger { suspendJob(testingJobManagerRunner); assertLocalCleanupTriggered(jobId); - assertThat(blobFile.exists(), is(false)); - - // verify that we did not clear the BlobStore - try { - deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS); - fail("We should not delete the HA blobs."); - } catch (TimeoutException ignored) { - // expected - } - - assertThat(deleteAllHABlobsFuture.isDone(), is(false)); } - /** Tests that the uploaded blobs are being cleaned up in case of a job submission failure. */ @Test - public void testBlobServerCleanupWhenJobSubmissionFails() throws Exception { + public void testGlobalCleanupWhenJobSubmissionFails() throws Exception { startDispatcher(new FailingJobManagerRunnerFactory(new FlinkException("Test exception"))); final CompletableFuture<Acknowledge> submissionFuture = submitJob(); @@ -328,34 +269,23 @@ public class DispatcherResourceCleanupTest extends TestLogger { submissionFuture.get(); fail("Job submission was expected to fail."); } catch (ExecutionException ee) { - assertThat(ee, FlinkMatchers.containsCause(JobSubmissionException.class)); + assertThat(ee, containsCause(JobSubmissionException.class)); } - assertThatHABlobsHaveBeenRemoved(); + assertGlobalCleanupTriggered(jobId); } @Test - public void testBlobServerCleanupWhenClosingDispatcher() throws Exception { + public void testLocalCleanupWhenClosingDispatcher() throws Exception { startDispatcherAndSubmitJob(); dispatcher.closeAsync().get(); assertLocalCleanupTriggered(jobId); - assertThat(blobFile.exists(), is(false)); - - // verify that we did not clear the BlobStore - try { - deleteAllHABlobsFuture.get(50L, TimeUnit.MILLISECONDS); - fail("We should not delete the HA blobs."); - } catch (TimeoutException ignored) { - // expected - } - - assertThat(deleteAllHABlobsFuture.isDone(), is(false)); } @Test - public void testHACleanupWhenJobFinishedWhileClosingDispatcher() throws Exception { + public void testGlobalCleanupWhenJobFinishedWhileClosingDispatcher() throws Exception { final TestingJobManagerRunner testingJobManagerRunner = TestingJobManagerRunner.newBuilder() .setBlockingTermination(true) @@ -384,38 +314,89 @@ public class DispatcherResourceCleanupTest extends TestLogger { dispatcherTerminationFuture.get(); assertGlobalCleanupTriggered(jobId); - assertThat(deleteAllHABlobsFuture.get(), is(jobId)); } - /** - * Tests that the {@link JobResultStore} entries are marked as clean after the job reached a - * terminal state. - */ @Test - public void testJobResultStoreCleanup() throws Exception { + public void testJobBeingMarkedAsDirtyBeforeCleanup() throws Exception { + final OneShotLatch markAsDirtyLatch = new OneShotLatch(); + + final TestingDispatcher.Builder dispatcherBuilder = + createTestingDispatcherBuilder() + .setJobResultStore( + TestingJobResultStore.builder() + .withCreateDirtyResultConsumer( + ignoredJobResultEntry -> { + try { + markAsDirtyLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }) + .build()); + final TestingJobManagerRunnerFactory jobManagerRunnerFactory = - startDispatcherAndSubmitJob(); + startDispatcherAndSubmitJob(dispatcherBuilder, 0); - final JobResult jobResult = - TestingJobResultStore.createJobResult(jobId, ApplicationStatus.UNKNOWN); + finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner()); - jobResultStore.createDirtyResult(new JobResultEntry(jobResult)); - assertTrue(jobResultStore.hasJobResultEntry(jobId)); + assertThatNoCleanupWasTriggered(); - final TestingJobManagerRunner testingJobManagerRunner = - jobManagerRunnerFactory.takeCreatedJobManagerRunner(); - testingJobManagerRunner.completeResultFuture( - new ExecutionGraphInfo( - new ArchivedExecutionGraphBuilder() - .setState(JobStatus.FINISHED) - .setJobID(jobId) - .build())); + markAsDirtyLatch.trigger(); - // wait for the clearing - clearedJobLatch.await(); + assertGlobalCleanupTriggered(jobId); + } - assertTrue(jobResultStore.hasJobResultEntry(jobId)); - assertTrue(jobResultStore.getDirtyResults().isEmpty()); + @Test + public void testJobBeingMarkedAsCleanAfterCleanup() throws Exception { + final CompletableFuture<JobID> markAsCleanFuture = new CompletableFuture<>(); + + final JobResultStore jobResultStore = + TestingJobResultStore.builder() + .withMarkResultAsCleanConsumer(markAsCleanFuture::complete) + .build(); + final OneShotLatch localCleanupLatch = new OneShotLatch(); + final OneShotLatch globalCleanupLatch = new OneShotLatch(); + final TestingResourceCleanerFactory resourceCleanerFactory = + TestingResourceCleanerFactory.builder() + .withLocallyCleanableResource( + (ignoredJobId, ignoredExecutor) -> { + try { + localCleanupLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return FutureUtils.completedVoidFuture(); + }) + .withGloballyCleanableResource( + (ignoredJobId, ignoredExecutor) -> { + try { + globalCleanupLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return FutureUtils.completedVoidFuture(); + }) + .build(); + + final TestingDispatcher.Builder dispatcherBuilder = + createTestingDispatcherBuilder() + .setJobResultStore(jobResultStore) + .setResourceCleanerFactory(resourceCleanerFactory); + + final TestingJobManagerRunnerFactory jobManagerRunnerFactory = + startDispatcherAndSubmitJob(dispatcherBuilder, 0); + + finishJob(jobManagerRunnerFactory.takeCreatedJobManagerRunner()); + + assertThat(markAsCleanFuture.isDone(), is(false)); + + localCleanupLatch.trigger(); + assertThat(markAsCleanFuture.isDone(), is(false)); + globalCleanupLatch.trigger(); + + assertThat(markAsCleanFuture.get(), is(jobId)); } /** @@ -481,32 +462,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { finishJob(testingJobManagerRunnerFactoryNG.takeCreatedJobManagerRunner()); } - assertThatHABlobsHaveBeenRemoved(); - } - - @Test - public void testHaDataCleanupWhenJobFinished() throws Exception { - TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob(); - TestingJobManagerRunner jobManagerRunner = - jobManagerRunnerFactory.takeCreatedJobManagerRunner(); - finishJob(jobManagerRunner); - JobID jobID = cleanupJobHADataFuture.get(2000, TimeUnit.MILLISECONDS); - assertThat(jobID, is(this.jobId)); - } - - @Test - public void testHaDataCleanupWhenJobNotFinished() throws Exception { - TestingJobManagerRunnerFactory jobManagerRunnerFactory = startDispatcherAndSubmitJob(); - TestingJobManagerRunner jobManagerRunner = - jobManagerRunnerFactory.takeCreatedJobManagerRunner(); - suspendJob(jobManagerRunner); - try { - cleanupJobHADataFuture.get(10L, TimeUnit.MILLISECONDS); - fail("We should not delete the HA data for job."); - } catch (TimeoutException ignored) { - // expected - } - assertThat(cleanupJobHADataFuture.isDone(), is(false)); + assertGlobalCleanupTriggered(jobId); } private void finishJob(TestingJobManagerRunner takeCreatedJobManagerRunner) { @@ -517,6 +473,10 @@ public class DispatcherResourceCleanupTest extends TestLogger { terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.SUSPENDED); } + private void cancelJob(TestingJobManagerRunner takeCreatedJobManagerRunner) { + terminateJobWithState(takeCreatedJobManagerRunner, JobStatus.CANCELED); + } + private void terminateJobWithState( TestingJobManagerRunner takeCreatedJobManagerRunner, JobStatus state) { takeCreatedJobManagerRunner.completeResultFuture( @@ -530,8 +490,6 @@ public class DispatcherResourceCleanupTest extends TestLogger { private void assertThatNoCleanupWasTriggered() { assertThat(globalCleanupFuture.isDone(), is(false)); assertThat(localCleanupFuture.isDone(), is(false)); - assertThat(deleteAllHABlobsFuture.isDone(), is(false)); - assertThat(blobFile.exists(), is(true)); } @Test @@ -566,84 +524,6 @@ public class DispatcherResourceCleanupTest extends TestLogger { dispatcherTerminationFuture.get(); } - private static final class SingleJobResultStore implements JobResultStore { - - private final JobID expectedJobId; - @Nullable private JobResultEntry actualJobResultEntry; - private boolean isDirty = true; - - private final OneShotLatch clearedJobLatch; - - private SingleJobResultStore(JobID expectedJobId, OneShotLatch clearedJobLatch) { - this.expectedJobId = expectedJobId; - this.clearedJobLatch = clearedJobLatch; - } - - @Override - public void createDirtyResult(JobResultEntry jobResultEntry) throws IOException { - checkJobId(jobResultEntry.getJobId()); - this.actualJobResultEntry = jobResultEntry; - } - - private void checkJobId(JobID jobID) { - Preconditions.checkArgument(expectedJobId.equals(jobID)); - } - - @Override - public void markResultAsClean(JobID jobId) throws IOException { - checkJobId(jobId); - Preconditions.checkNotNull(actualJobResultEntry); - isDirty = false; - clearedJobLatch.trigger(); - } - - @Override - public boolean hasDirtyJobResultEntry(JobID jobId) throws IOException { - if (actualJobResultEntry == null) { - return false; - } - - checkJobId(jobId); - return isDirty; - } - - @Override - public boolean hasCleanJobResultEntry(JobID jobId) throws IOException { - if (actualJobResultEntry == null) { - return false; - } - - checkJobId(jobId); - return !isDirty; - } - - @Override - public Set<JobResult> getDirtyResults() throws IOException { - return actualJobResultEntry != null && isDirty - ? Collections.singleton(actualJobResultEntry.getJobResult()) - : Collections.emptySet(); - } - } - - @Test - public void testHABlobsAreRemovedIfHAJobGraphRemovalSucceeds() throws Exception { - final TestingJobManagerRunnerFactory jobManagerRunnerFactory = - startDispatcherAndSubmitJob(); - - ArchivedExecutionGraph executionGraph = - new ArchivedExecutionGraphBuilder() - .setJobID(jobId) - .setState(JobStatus.CANCELED) - .build(); - - final TestingJobManagerRunner testingJobManagerRunner = - jobManagerRunnerFactory.takeCreatedJobManagerRunner(); - testingJobManagerRunner.completeResultFuture(new ExecutionGraphInfo(executionGraph)); - - assertGlobalCleanupTriggered(jobId); - assertThat(deleteAllHABlobsFuture.get(), equalTo(jobId)); - } - private void assertLocalCleanupTriggered(JobID jobId) throws ExecutionException, InterruptedException, TimeoutException { assertThat(localCleanupFuture.get(100, TimeUnit.MILLISECONDS), equalTo(jobId)); @@ -658,17 +538,17 @@ public class DispatcherResourceCleanupTest extends TestLogger { @Test public void testFatalErrorIfJobCannotBeMarkedDirtyInJobResultStore() throws Exception { - jobResultStore = + final JobResultStore jobResultStore = TestingJobResultStore.builder() .withCreateDirtyResultConsumer( jobResult -> { throw new IOException("Expected IOException."); }) .build(); - highAvailabilityServices.setJobResultStore(jobResultStore); final TestingJobManagerRunnerFactory jobManagerRunnerFactory = - startDispatcherAndSubmitJob(); + startDispatcherAndSubmitJob( + createTestingDispatcherBuilder().setJobResultStore(jobResultStore), 0); ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder() @@ -691,7 +571,7 @@ public class DispatcherResourceCleanupTest extends TestLogger { @Test public void testErrorHandlingIfJobCannotBeMarkedAsCleanInJobResultStore() throws Exception { final CompletableFuture<JobResultEntry> dirtyJobFuture = new CompletableFuture<>(); - jobResultStore = + final JobResultStore jobResultStore = TestingJobResultStore.builder() .withCreateDirtyResultConsumer(dirtyJobFuture::complete) .withMarkResultAsCleanConsumer( @@ -699,10 +579,10 @@ public class DispatcherResourceCleanupTest extends TestLogger { throw new IOException("Expected IOException."); }) .build(); - highAvailabilityServices.setJobResultStore(jobResultStore); final TestingJobManagerRunnerFactory jobManagerRunnerFactory = - startDispatcherAndSubmitJob(); + startDispatcherAndSubmitJob( + createTestingDispatcherBuilder().setJobResultStore(jobResultStore), 0); ArchivedExecutionGraph executionGraph = new ArchivedExecutionGraphBuilder() @@ -729,45 +609,108 @@ public class DispatcherResourceCleanupTest extends TestLogger { assertThat(dirtyJobFuture.get().getJobId(), is(jobId)); } - private static final class TestingBlobServer extends BlobServer { - - private final BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupFunction; - private final BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupFunction; - - /** - * Instantiates a new BLOB server and binds it to a free network port. - * - * @param config Configuration to be used to instantiate the BlobServer - * @param blobStore BlobStore to store blobs persistently - * @param globalCleanupFunction The function called along the actual {@link - * #globalCleanupAsync(JobID, Executor)} call. - * @param localCleanupFunction The function called along the actual {@link - * #localCleanupAsync(JobID, Executor)} call. - * @throws IOException thrown if the BLOB server cannot bind to a free network port or if - * the (local or distributed) file storage cannot be created or is not usable - */ - public TestingBlobServer( - Configuration config, - File storageDirectory, - BlobStore blobStore, - BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupFunction, - BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupFunction) - throws IOException { - super(config, storageDirectory, blobStore); - this.globalCleanupFunction = globalCleanupFunction; - this.localCleanupFunction = localCleanupFunction; + /** Tests that a failing {@link JobManagerRunner} will be properly cleaned up. */ + @Test + public void testFailingJobManagerRunnerCleanup() throws Exception { + final FlinkException testException = new FlinkException("Test exception."); + final ArrayBlockingQueue<Optional<Exception>> queue = new ArrayBlockingQueue<>(2); + + final BlockingJobManagerRunnerFactory blockingJobManagerRunnerFactory = + new BlockingJobManagerRunnerFactory( + () -> { + final Optional<Exception> maybeException = queue.take(); + if (maybeException.isPresent()) { + throw maybeException.get(); + } + }); + + startDispatcher(blockingJobManagerRunnerFactory); + + final DispatcherGateway dispatcherGateway = + dispatcher.getSelfGateway(DispatcherGateway.class); + + // submit and fail during job master runner construction + queue.offer(Optional.of(testException)); + try { + dispatcherGateway.submitJob(jobGraph, Time.minutes(1)).get(); + fail("A FlinkException is expected"); + } catch (Throwable expectedException) { + assertThat(expectedException, containsCause(FlinkException.class)); + assertThat(expectedException, containsMessage(testException.getMessage())); + // make sure we've cleaned up in correct order (including HA) + assertGlobalCleanupTriggered(jobId); } - @Override - public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) { - return super.globalCleanupAsync(jobId, executor) - .thenCompose(ignored -> globalCleanupFunction.apply(jobId, executor)); + // don't fail this time + queue.offer(Optional.empty()); + // submit job again + dispatcherGateway.submitJob(jobGraph, Time.minutes(1L)).get(); + blockingJobManagerRunnerFactory.setJobStatus(JobStatus.RUNNING); + + // Ensure job is running + awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING); + } + + private static final class BlockingJobManagerRunnerFactory + extends TestingJobManagerRunnerFactory { + + private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch; + private TestingJobManagerRunner testingRunner; + + BlockingJobManagerRunnerFactory(ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) { + this.jobManagerRunnerCreationLatch = jobManagerRunnerCreationLatch; } @Override - public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) { - return super.localCleanupAsync(jobId, executor) - .thenCompose(ignored -> localCleanupFunction.apply(jobId, executor)); + public TestingJobManagerRunner createJobManagerRunner( + JobGraph jobGraph, + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + JobManagerSharedServices jobManagerSharedServices, + JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, + FatalErrorHandler fatalErrorHandler, + long initializationTimestamp) + throws Exception { + jobManagerRunnerCreationLatch.run(); + + this.testingRunner = + super.createJobManagerRunner( + jobGraph, + configuration, + rpcService, + highAvailabilityServices, + heartbeatServices, + jobManagerSharedServices, + jobManagerJobMetricGroupFactory, + fatalErrorHandler, + initializationTimestamp); + + TestingJobMasterGateway testingJobMasterGateway = + new TestingJobMasterGatewayBuilder() + .setRequestJobSupplier( + () -> + CompletableFuture.completedFuture( + new ExecutionGraphInfo( + ArchivedExecutionGraph + .createFromInitializingJob( + jobGraph.getJobID(), + jobGraph.getName(), + JobStatus.RUNNING, + null, + null, + 1337)))) + .build(); + testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway); + return testingRunner; + } + + public void setJobStatus(JobStatus newStatus) { + Preconditions.checkState( + testingRunner != null, + "JobManagerRunner must be created before this method is available"); + this.testingRunner.setJobStatus(newStatus); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 6f4898a..bad61cd 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -59,7 +59,6 @@ import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceProce import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory; import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory; import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory; -import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; @@ -89,7 +88,6 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TimeUtils; import org.apache.flink.util.concurrent.FutureUtils; -import org.apache.flink.util.function.ThrowingRunnable; import org.assertj.core.api.Assertions; import org.hamcrest.Matchers; @@ -110,11 +108,9 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Optional; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; @@ -127,12 +123,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; -import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; -import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -178,8 +172,8 @@ public class DispatcherTest extends AbstractDispatcherTest { JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception { final TestingDispatcher dispatcher = - new TestingDispatcherBuilder() - .setHaServices(haServices) + createTestingDispatcherBuilder() + .setHighAvailabilityServices(haServices) .setHeartbeatServices(heartbeatServices) .setJobManagerRunnerFactory(jobManagerRunnerFactory) .setJobGraphWriter(haServices.getJobGraphStore()) @@ -246,11 +240,11 @@ public class DispatcherTest extends AbstractDispatcherTest { @Test public void testDuplicateJobSubmissionWithRunningJobId() throws Exception { dispatcher = - new TestingDispatcherBuilder() + createTestingDispatcherBuilder() .setJobManagerRunnerFactory( new ExpectedJobIdJobManagerRunnerFactory( jobId, createdJobManagerRunnerLatch)) - .setInitialJobGraphs(Collections.singleton(jobGraph)) + .setRecoveredJobs(Collections.singleton(jobGraph)) .build(); dispatcher.start(); final DispatcherGateway dispatcherGateway = @@ -474,7 +468,7 @@ public class DispatcherTest extends AbstractDispatcherTest { final CompletableFuture<JobManagerRunnerResult> jobTerminationFuture = new CompletableFuture<>(); dispatcher = - new TestingDispatcherBuilder() + createTestingDispatcherBuilder() .setJobManagerRunnerFactory( new FinishingJobManagerRunnerFactory( jobTerminationFuture, () -> {})) @@ -675,10 +669,9 @@ public class DispatcherTest extends AbstractDispatcherTest { final TestingJobManagerRunnerFactory jobManagerRunnerFactory = new TestingJobManagerRunnerFactory(); dispatcher = - new TestingDispatcherBuilder() + createTestingDispatcherBuilder() .setJobManagerRunnerFactory(jobManagerRunnerFactory) - .setInitialJobGraphs( - Collections.singleton(JobGraphTestUtils.emptyJobGraph())) + .setRecoveredJobs(Collections.singleton(JobGraphTestUtils.emptyJobGraph())) .build(); dispatcher.start(); @@ -721,89 +714,12 @@ public class DispatcherTest extends AbstractDispatcherTest { final JobResult jobResult = TestingJobResultStore.createSuccessfulJobResult(jobGraph.getJobID()); dispatcher = - new TestingDispatcherBuilder() - .setInitialJobGraphs(Collections.singleton(jobGraph)) - .setDirtyJobResults(Collections.singleton(jobResult)) + createTestingDispatcherBuilder() + .setRecoveredJobs(Collections.singleton(jobGraph)) + .setRecoveredDirtyJobs(Collections.singleton(jobResult)) .build(); } - /** Tests that a failing {@link JobManagerRunner} will be properly cleaned up. */ - @Test - public void testFailingJobManagerRunnerCleanup() throws Exception { - final FlinkException testException = new FlinkException("Test exception."); - final ArrayBlockingQueue<Optional<Exception>> queue = new ArrayBlockingQueue<>(2); - - final BlockingJobManagerRunnerFactory blockingJobManagerRunnerFactory = - new BlockingJobManagerRunnerFactory( - () -> { - final Optional<Exception> maybeException = queue.take(); - if (maybeException.isPresent()) { - throw maybeException.get(); - } - }); - - final BlockingQueue<String> cleanUpEvents = new LinkedBlockingQueue<>(); - - // Track cleanup - ha-services - final CompletableFuture<JobID> cleanupJobData = new CompletableFuture<>(); - haServices.setGlobalCleanupFuture(cleanupJobData); - cleanupJobData.thenAccept(jobId -> cleanUpEvents.add(CLEANUP_HA_SERVICES)); - - // Track cleanup - job-graph - final TestingJobGraphStore jobGraphStore = - TestingJobGraphStore.newBuilder() - .setLocalCleanupFunction( - (jobId, executor) -> { - cleanUpEvents.add(CLEANUP_JOB_GRAPH_RELEASE); - return FutureUtils.completedVoidFuture(); - }) - .setGlobalCleanupFunction( - (jobId, executor) -> { - cleanUpEvents.add(CLEANUP_JOB_GRAPH_REMOVE); - return FutureUtils.completedVoidFuture(); - }) - .build(); - jobGraphStore.start(null); - haServices.setJobGraphStore(jobGraphStore); - - // Track cleanup - job result store - haServices.setJobResultStore( - TestingJobResultStore.builder() - .withMarkResultAsCleanConsumer( - jobID -> cleanUpEvents.add(CLEANUP_JOB_RESULT_STORE)) - .build()); - - dispatcher = - createAndStartDispatcher( - heartbeatServices, haServices, blockingJobManagerRunnerFactory); - - final DispatcherGateway dispatcherGateway = - dispatcher.getSelfGateway(DispatcherGateway.class); - - // submit and fail during job master runner construction - queue.offer(Optional.of(testException)); - try { - dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); - fail("A FlinkException is expected"); - } catch (Throwable expectedException) { - assertThat(expectedException, containsCause(FlinkException.class)); - assertThat(expectedException, containsMessage(testException.getMessage())); - // make sure we've cleaned up in correct order (including HA) - assertThat( - new ArrayList<>(cleanUpEvents), - containsInAnyOrder(CLEANUP_JOB_GRAPH_REMOVE, CLEANUP_HA_SERVICES)); - } - - // don't fail this time - queue.offer(Optional.empty()); - // submit job again - dispatcherGateway.submitJob(jobGraph, TIMEOUT).get(); - blockingJobManagerRunnerFactory.setJobStatus(JobStatus.RUNNING); - - // Ensure job is running - awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING); - } - @Test public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception { final TestingJobGraphStore submittedJobGraphStore = @@ -812,7 +728,7 @@ public class DispatcherTest extends AbstractDispatcherTest { haServices.setJobGraphStore(submittedJobGraphStore); dispatcher = - new TestingDispatcherBuilder().setJobGraphWriter(submittedJobGraphStore).build(); + createTestingDispatcherBuilder().setJobGraphWriter(submittedJobGraphStore).build(); dispatcher.start(); @@ -930,8 +846,8 @@ public class DispatcherTest extends AbstractDispatcherTest { testingJobGraphStore.start(null); dispatcher = - new TestingDispatcherBuilder() - .setInitialJobGraphs(Collections.singleton(jobGraph)) + createTestingDispatcherBuilder() + .setRecoveredJobs(Collections.singleton(jobGraph)) .setJobGraphWriter(testingJobGraphStore) .build(); dispatcher.start(); @@ -1110,8 +1026,8 @@ public class DispatcherTest extends AbstractDispatcherTest { final PermanentBlobKey blobKey2 = blobServer.putPermanent(jobId2, fileContent); dispatcher = - new TestingDispatcherBuilder() - .setInitialJobGraphs(Collections.singleton(new JobGraph(jobId1, "foobar"))) + createTestingDispatcherBuilder() + .setRecoveredJobs(Collections.singleton(new JobGraph(jobId1, "foobar"))) .build(); Assertions.assertThat(blobServer.getFile(jobId1, blobKey1)).hasBinaryContent(fileContent); @@ -1365,70 +1281,6 @@ public class DispatcherTest extends AbstractDispatcherTest { } } - private static final class BlockingJobManagerRunnerFactory - extends TestingJobManagerRunnerFactory { - - @Nonnull private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch; - private TestingJobManagerRunner testingRunner; - - BlockingJobManagerRunnerFactory( - @Nonnull ThrowingRunnable<Exception> jobManagerRunnerCreationLatch) { - this.jobManagerRunnerCreationLatch = jobManagerRunnerCreationLatch; - } - - @Override - public TestingJobManagerRunner createJobManagerRunner( - JobGraph jobGraph, - Configuration configuration, - RpcService rpcService, - HighAvailabilityServices highAvailabilityServices, - HeartbeatServices heartbeatServices, - JobManagerSharedServices jobManagerSharedServices, - JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, - FatalErrorHandler fatalErrorHandler, - long initializationTimestamp) - throws Exception { - jobManagerRunnerCreationLatch.run(); - - this.testingRunner = - super.createJobManagerRunner( - jobGraph, - configuration, - rpcService, - highAvailabilityServices, - heartbeatServices, - jobManagerSharedServices, - jobManagerJobMetricGroupFactory, - fatalErrorHandler, - initializationTimestamp); - - TestingJobMasterGateway testingJobMasterGateway = - new TestingJobMasterGatewayBuilder() - .setRequestJobSupplier( - () -> - CompletableFuture.completedFuture( - new ExecutionGraphInfo( - ArchivedExecutionGraph - .createFromInitializingJob( - jobGraph.getJobID(), - jobGraph.getName(), - JobStatus.RUNNING, - null, - null, - 1337)))) - .build(); - testingRunner.completeJobMasterGatewayFuture(testingJobMasterGateway); - return testingRunner; - } - - public void setJobStatus(JobStatus newStatus) { - Preconditions.checkState( - testingRunner != null, - "JobManagerRunner must be created before this method is available"); - this.testingRunner.setJobStatus(newStatus); - } - } - private static final class InitializationTimestampCapturingJobManagerRunnerFactory implements JobManagerRunnerFactory { private final BlockingQueue<Long> initializationTimestampQueue; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java index ba5ecca..5c3efc7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingDispatcher.java @@ -20,16 +20,39 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.dispatcher.cleanup.DispatcherResourceCleanerFactory; +import org.apache.flink.runtime.dispatcher.cleanup.ResourceCleanerFactory; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.JobResultStore; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphWriter; import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.util.TestingFatalErrorHandler; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Collection; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; import java.util.function.Function; /** {@link Dispatcher} implementation used for testing purposes. */ @@ -56,6 +79,58 @@ class TestingDispatcher extends Dispatcher { this.startFuture = new CompletableFuture<>(); } + private TestingDispatcher( + RpcService rpcService, + DispatcherId fencingToken, + Collection<JobGraph> recoveredJobs, + Collection<JobResult> recoveredDirtyJobs, + Configuration configuration, + HighAvailabilityServices highAvailabilityServices, + GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, + HeartbeatServices heartbeatServices, + BlobServer blobServer, + FatalErrorHandler fatalErrorHandler, + JobGraphWriter jobGraphWriter, + JobResultStore jobResultStore, + JobManagerMetricGroup jobManagerMetricGroup, + @Nullable String metricServiceQueryAddress, + Executor ioExecutor, + HistoryServerArchivist historyServerArchivist, + ExecutionGraphInfoStore executionGraphInfoStore, + JobManagerRunnerFactory jobManagerRunnerFactory, + DispatcherBootstrapFactory dispatcherBootstrapFactory, + DispatcherOperationCaches dispatcherOperationCaches, + JobManagerRunnerRegistry jobManagerRunnerRegistry, + ResourceCleanerFactory resourceCleanerFactory) + throws Exception { + super( + rpcService, + fencingToken, + recoveredJobs, + recoveredDirtyJobs, + dispatcherBootstrapFactory, + new DispatcherServices( + configuration, + highAvailabilityServices, + resourceManagerGatewayRetriever, + blobServer, + heartbeatServices, + executionGraphInfoStore, + fatalErrorHandler, + historyServerArchivist, + metricServiceQueryAddress, + dispatcherOperationCaches, + jobManagerMetricGroup, + jobGraphWriter, + jobResultStore, + jobManagerRunnerFactory, + ioExecutor), + jobManagerRunnerRegistry, + resourceCleanerFactory); + + this.startFuture = new CompletableFuture<>(); + } + @Override public void onStart() throws Exception { try { @@ -91,4 +166,210 @@ class TestingDispatcher extends Dispatcher { void waitUntilStarted() { startFuture.join(); } + + public static TestingDispatcher.Builder builder() { + return new Builder(); + } + + public static class Builder { + private RpcService rpcService = new TestingRpcService(); + private DispatcherId fencingToken = DispatcherId.generate(); + private Collection<JobGraph> recoveredJobs = Collections.emptyList(); + private Collection<JobResult> recoveredDirtyJobs = Collections.emptyList(); + private HighAvailabilityServices highAvailabilityServices = + new TestingHighAvailabilityServices(); + + private TestingResourceManagerGateway resourceManagerGateway = + new TestingResourceManagerGateway(); + private GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = + () -> CompletableFuture.completedFuture(resourceManagerGateway); + private HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); + + private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE; + private JobResultStore jobResultStore = new EmbeddedJobResultStore(); + + private Configuration configuration = new Configuration(); + + // even-though it's labeled as @Nullable, it's a mandatory field that needs to be set before + // building the Dispatcher instance + @Nullable private BlobServer blobServer = null; + private FatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler(); + private JobManagerMetricGroup jobManagerMetricGroup = + UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(); + @Nullable private String metricServiceQueryAddress = null; + private Executor ioExecutor = ForkJoinPool.commonPool(); + private HistoryServerArchivist historyServerArchivist = VoidHistoryServerArchivist.INSTANCE; + private ExecutionGraphInfoStore executionGraphInfoStore = + new MemoryExecutionGraphInfoStore(); + private JobManagerRunnerFactory jobManagerRunnerFactory = + new TestingJobManagerRunnerFactory(0); + private DispatcherBootstrapFactory dispatcherBootstrapFactory = + (dispatcher, scheduledExecutor, errorHandler) -> new NoOpDispatcherBootstrap(); + private DispatcherOperationCaches dispatcherOperationCaches = + new DispatcherOperationCaches(); + private JobManagerRunnerRegistry jobManagerRunnerRegistry = + new DefaultJobManagerRunnerRegistry(1); + @Nullable private ResourceCleanerFactory resourceCleanerFactory; + + public Builder setRpcService(RpcService rpcService) { + this.rpcService = rpcService; + return this; + } + + public Builder setFencingToken(DispatcherId fencingToken) { + this.fencingToken = fencingToken; + return this; + } + + public Builder setRecoveredJobs(Collection<JobGraph> recoveredJobs) { + this.recoveredJobs = recoveredJobs; + return this; + } + + public Builder setRecoveredDirtyJobs(Collection<JobResult> recoveredDirtyJobs) { + this.recoveredDirtyJobs = recoveredDirtyJobs; + return this; + } + + public Builder setHighAvailabilityServices( + HighAvailabilityServices highAvailabilityServices) { + this.highAvailabilityServices = highAvailabilityServices; + return this; + } + + public Builder setResourceManagerGateway( + TestingResourceManagerGateway resourceManagerGateway) { + this.resourceManagerGateway = resourceManagerGateway; + return this; + } + + public Builder setResourceManagerGatewayRetriever( + GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) { + this.resourceManagerGatewayRetriever = resourceManagerGatewayRetriever; + return this; + } + + public Builder setHeartbeatServices(HeartbeatServices heartbeatServices) { + this.heartbeatServices = heartbeatServices; + return this; + } + + public Builder setJobGraphWriter(JobGraphWriter jobGraphWriter) { + this.jobGraphWriter = jobGraphWriter; + return this; + } + + public Builder setJobResultStore(JobResultStore jobResultStore) { + this.jobResultStore = jobResultStore; + return this; + } + + public Builder setConfiguration(Configuration configuration) { + this.configuration = configuration; + return this; + } + + public Builder setBlobServer(BlobServer blobServer) { + this.blobServer = blobServer; + return this; + } + + public Builder setFatalErrorHandler(FatalErrorHandler fatalErrorHandler) { + this.fatalErrorHandler = fatalErrorHandler; + return this; + } + + public Builder setJobManagerMetricGroup(JobManagerMetricGroup jobManagerMetricGroup) { + this.jobManagerMetricGroup = jobManagerMetricGroup; + return this; + } + + public Builder setMetricServiceQueryAddress(@Nullable String metricServiceQueryAddress) { + this.metricServiceQueryAddress = metricServiceQueryAddress; + return this; + } + + public Builder setIoExecutor(Executor ioExecutor) { + this.ioExecutor = ioExecutor; + return this; + } + + public Builder setHistoryServerArchivist(HistoryServerArchivist historyServerArchivist) { + this.historyServerArchivist = historyServerArchivist; + return this; + } + + public Builder setExecutionGraphInfoStore(ExecutionGraphInfoStore executionGraphInfoStore) { + this.executionGraphInfoStore = executionGraphInfoStore; + return this; + } + + public Builder setJobManagerRunnerFactory(JobManagerRunnerFactory jobManagerRunnerFactory) { + this.jobManagerRunnerFactory = jobManagerRunnerFactory; + return this; + } + + public Builder setDispatcherBootstrapFactory( + DispatcherBootstrapFactory dispatcherBootstrapFactory) { + this.dispatcherBootstrapFactory = dispatcherBootstrapFactory; + return this; + } + + public Builder setDispatcherOperationCaches( + DispatcherOperationCaches dispatcherOperationCaches) { + this.dispatcherOperationCaches = dispatcherOperationCaches; + return this; + } + + public Builder setJobManagerRunnerRegistry( + JobManagerRunnerRegistry jobManagerRunnerRegistry) { + this.jobManagerRunnerRegistry = jobManagerRunnerRegistry; + return this; + } + + public Builder setResourceCleanerFactory(ResourceCleanerFactory resourceCleanerFactory) { + this.resourceCleanerFactory = resourceCleanerFactory; + return this; + } + + private ResourceCleanerFactory createDefaultResourceCleanerFactory() { + return new DispatcherResourceCleanerFactory( + ioExecutor, + jobManagerRunnerRegistry, + jobGraphWriter, + blobServer, + highAvailabilityServices, + jobManagerMetricGroup); + } + + public TestingDispatcher build() throws Exception { + return new TestingDispatcher( + rpcService, + fencingToken, + recoveredJobs, + recoveredDirtyJobs, + configuration, + highAvailabilityServices, + resourceManagerGatewayRetriever, + heartbeatServices, + Preconditions.checkNotNull( + blobServer, + "No BlobServer is specified for building the TestingDispatcher"), + fatalErrorHandler, + jobGraphWriter, + jobResultStore, + jobManagerMetricGroup, + metricServiceQueryAddress, + ioExecutor, + historyServerArchivist, + executionGraphInfoStore, + jobManagerRunnerFactory, + dispatcherBootstrapFactory, + dispatcherOperationCaches, + jobManagerRunnerRegistry, + resourceCleanerFactory != null + ? resourceCleanerFactory + : createDefaultResourceCleanerFactory()); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java new file mode 100644 index 0000000..2d54b8b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/TestingResourceCleanerFactory.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.concurrent.Executors; +import org.apache.flink.util.concurrent.FutureUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.Executor; + +/** {@code TestingResourceCleanerFactory} for adding custom {@link ResourceCleaner} creation. */ +public class TestingResourceCleanerFactory implements ResourceCleanerFactory { + + private final Collection<LocallyCleanableResource> locallyCleanableResources; + private final Collection<GloballyCleanableResource> globallyCleanableResources; + + private final Executor cleanupExecutor; + + private TestingResourceCleanerFactory( + Collection<LocallyCleanableResource> locallyCleanableResources, + Collection<GloballyCleanableResource> globallyCleanableResources, + Executor cleanupExecutor) { + this.locallyCleanableResources = locallyCleanableResources; + this.globallyCleanableResources = globallyCleanableResources; + this.cleanupExecutor = cleanupExecutor; + } + + @Override + public ResourceCleaner createLocalResourceCleaner( + ComponentMainThreadExecutor mainThreadExecutor) { + return createResourceCleaner( + mainThreadExecutor, + locallyCleanableResources, + LocallyCleanableResource::localCleanupAsync); + } + + @Override + public ResourceCleaner createGlobalResourceCleaner( + ComponentMainThreadExecutor mainThreadExecutor) { + return createResourceCleaner( + mainThreadExecutor, + globallyCleanableResources, + GloballyCleanableResource::globalCleanupAsync); + } + + private <T> ResourceCleaner createResourceCleaner( + ComponentMainThreadExecutor mainThreadExecutor, + Collection<T> resources, + DefaultResourceCleaner.CleanupFn<T> cleanupFn) { + return jobId -> { + mainThreadExecutor.assertRunningInMainThread(); + Throwable t = null; + for (T resource : resources) { + try { + cleanupFn.cleanupAsync(resource, jobId, cleanupExecutor).get(); + } catch (Throwable throwable) { + t = ExceptionUtils.firstOrSuppressed(throwable, t); + } + } + return t != null + ? FutureUtils.completedExceptionally(t) + : FutureUtils.completedVoidFuture(); + }; + } + + public static Builder builder() { + return new Builder(); + } + + /** {@code Builder} for creating {@code TestingResourceCleanerFactory} instances. */ + public static class Builder { + + private Collection<LocallyCleanableResource> locallyCleanableResources = new ArrayList<>(); + private Collection<GloballyCleanableResource> globallyCleanableResources = + new ArrayList<>(); + + private Executor cleanupExecutor = Executors.directExecutor(); + + public Builder setLocallyCleanableResources( + Collection<LocallyCleanableResource> locallyCleanableResources) { + this.locallyCleanableResources = locallyCleanableResources; + return this; + } + + public Builder withLocallyCleanableResource( + LocallyCleanableResource locallyCleanableResource) { + this.locallyCleanableResources.add(locallyCleanableResource); + return this; + } + + public Builder setGloballyCleanableResources( + Collection<GloballyCleanableResource> globallyCleanableResources) { + this.globallyCleanableResources = globallyCleanableResources; + return this; + } + + public Builder withGloballyCleanableResource( + GloballyCleanableResource globallyCleanableResource) { + this.globallyCleanableResources.add(globallyCleanableResource); + return this; + } + + public Builder setCleanupExecutor(Executor cleanupExecutor) { + this.cleanupExecutor = cleanupExecutor; + return this; + } + + public TestingResourceCleanerFactory build() { + return new TestingResourceCleanerFactory( + locallyCleanableResources, globallyCleanableResources, cleanupExecutor); + } + } +}
