dmvk commented on a change in pull request #18536: URL: https://github.com/apache/flink/pull/18536#discussion_r796361359
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * {@code GloballyCleanableResource} is supposed to be used by any class that provides artifacts for + * a given job that can be cleaned up globally. + * + * @see LocallyCleanableResource + */ +@FunctionalInterface +public interface GloballyCleanableResource { + + void globalCleanup(JobID jobId) throws Throwable; Review comment: We should use a more strict checked exception here, eg. `IOException` sounds reasonable; same for the `Local...` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * {@code GloballyCleanableResource} is supposed to be used by any class that provides artifacts for + * a given job that can be cleaned up globally. + * + * @see LocallyCleanableResource + */ +@FunctionalInterface +public interface GloballyCleanableResource { Review comment: nit: it might be useful to elaborate what is the difference between local and global cleanup in javadocs (same vith the `Local...` one) plus methods are missing javadocs ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java ########## @@ -185,15 +185,15 @@ public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws IOExcept private final Queue<? super CloseOperations> closeOperations; private final RunnableWithException internalCleanupRunnable; - private final Consumer<JobID> internalJobCleanupConsumer; + private final ThrowingConsumer<JobID, Exception> internalJobCleanupConsumer; Review comment: signature -> `ThrowingConsumer<JobID, IOException>` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ########## @@ -286,7 +291,12 @@ public void closeAndCleanupAllData() throws Exception { } @Override - public void cleanupJobData(JobID jobID) { - Optional.ofNullable(jobCleanupFuture).ifPresent(f -> f.complete(jobID)); + public void globalCleanup(JobID jobID) throws Exception { + Optional.ofNullable(globalCleanupFuture).ifPresent(f -> f.complete(jobID)); Review comment: why don't we use a simple null check here? This doesn't really add anything on readability or performance. (same for local cleanup) ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java ########## @@ -109,12 +114,16 @@ public void testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed() final JobID jobId = jobGraph.getJobID(); // Construct job graph store. - final Error jobGraphRemovalError = new Error("Unable to remove job graph."); + final Error temporaryError = new Error("Unable to remove job graph."); + final AtomicReference<? extends Error> temporaryErrorRef = Review comment: Why error instead of a exception? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java ########## @@ -109,12 +114,16 @@ public void testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed() final JobID jobId = jobGraph.getJobID(); // Construct job graph store. - final Error jobGraphRemovalError = new Error("Unable to remove job graph."); + final Error temporaryError = new Error("Unable to remove job graph."); + final AtomicReference<? extends Error> temporaryErrorRef = + new AtomicReference<>(temporaryError); final TestingJobGraphStore jobGraphStore = TestingJobGraphStore.newBuilder() - .setRemoveJobGraphConsumer( + .setGlobalCleanupConsumer( graph -> { - throw jobGraphRemovalError; + if (temporaryErrorRef.get() != null) { + throw temporaryErrorRef.getAndSet(null); + } Review comment: ```suggestion final Error error = temporaryErrorRef.getAndSet(null); if (error != null) { throw error; } ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java ########## @@ -42,5 +41,9 @@ * @param jobId specifying the job to release the locks for * @throws Exception if the locks cannot be released */ Review comment: should we remove the javadocs? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ########## @@ -861,61 +869,73 @@ public boolean deletePermanent(JobID jobId, PermanentBlobKey key) { } /** - * Removes all BLOBs from local and HA store belonging to the given job ID. + * Deletes locally stored artifacts for the job represented by the given {@link JobID}. This + * doesn't touch the job's entry in the {@link BlobStore} to enable recovering. * - * @param jobId ID of the job this blob belongs to - * @param cleanupBlobStoreFiles True if the corresponding blob store files shall be cleaned up - * as well. Otherwise false. - * @return <tt>true</tt> if the job directory is successfully deleted or non-existing; - * <tt>false</tt> otherwise + * @param jobId The {@code JobID} of the job that is subject to cleanup. + * @throws IOException if the cleanup failed. */ - public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) { + @Override + public void localCleanup(JobID jobId) throws IOException { checkNotNull(jobId); final File jobDir = new File( BlobUtils.getStorageLocationPath( storageDir.deref().getAbsolutePath(), jobId)); + FileUtils.deleteDirectory(jobDir); + + // NOTE on why blobExpiryTimes are not cleaned up: + // Instead of going through blobExpiryTimes, keep lingering entries - they + // will be cleaned up by the timer task which tolerates non-existing files + // If inserted again with the same IDs (via put()), the TTL will be updated + // again. + } + + /** + * Removes all BLOBs from local and HA store belonging to the given {@link JobID}. + * + * @param jobId ID of the job this blob belongs to + * @throws Exception if the cleanup fails. + */ + @Override + public void globalCleanup(JobID jobId) throws Exception { + checkNotNull(jobId); + readWriteLock.writeLock().lock(); try { - // delete locally - boolean deletedLocally = false; - try { - FileUtils.deleteDirectory(jobDir); - - // NOTE on why blobExpiryTimes are not cleaned up: - // Instead of going through blobExpiryTimes, keep lingering entries - they - // will be cleaned up by the timer task which tolerates non-existing files - // If inserted again with the same IDs (via put()), the TTL will be updated - // again. + Exception exception = null; - deletedLocally = true; + try { + localCleanup(jobId); } catch (IOException e) { - LOG.warn( - "Failed to locally delete BLOB storage directory at " - + jobDir.getAbsolutePath(), - e); + exception = e; } - // delete in HA blob store files - final boolean deletedHA = !cleanupBlobStoreFiles || blobStore.deleteAll(jobId); + if (!blobStore.deleteAll(jobId)) { + exception = + ExceptionUtils.firstOrSuppressed( + new FlinkException( + "Error while cleaning up the BlobStore for job " + jobId), + exception); + } - return deletedLocally && deletedHA; + ExceptionUtils.tryRethrowException(exception); } finally { readWriteLock.writeLock().unlock(); } } - public void retainJobs(Collection<JobID> jobsToRetain) throws IOException { + public void retainJobs(Collection<JobID> jobsToRetain) throws Exception { Review comment: I don't think we should change the signature here. See the comment on `GloballyCleanableResource` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobServerDeleteTest.java ########## @@ -266,21 +266,22 @@ private void testDeleteBlobFails(@Nullable final JobID jobId, BlobKey.BlobType b } @Test - public void testJobCleanup() throws IOException { Review comment: again, we should get rid of all these signature changes, they're not needed ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java ########## @@ -33,7 +35,8 @@ * <p>Contains extra logic for adding jobs with tasks, and removing jobs when they do not contain * tasks any more */ -public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetricGroup> { +public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetricGroup> + implements LocallyCleanableResource, GloballyCleanableResource { Review comment: This makes me think: JobManagerMetricGroup is not really globally cleanable -> it doesn't have any globally persistent state, so it shouldn't implement `GloballyCleanableResource` 🤔 ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ########## @@ -861,61 +869,73 @@ public boolean deletePermanent(JobID jobId, PermanentBlobKey key) { } /** - * Removes all BLOBs from local and HA store belonging to the given job ID. + * Deletes locally stored artifacts for the job represented by the given {@link JobID}. This + * doesn't touch the job's entry in the {@link BlobStore} to enable recovering. * - * @param jobId ID of the job this blob belongs to - * @param cleanupBlobStoreFiles True if the corresponding blob store files shall be cleaned up - * as well. Otherwise false. - * @return <tt>true</tt> if the job directory is successfully deleted or non-existing; - * <tt>false</tt> otherwise + * @param jobId The {@code JobID} of the job that is subject to cleanup. + * @throws IOException if the cleanup failed. */ - public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) { + @Override + public void localCleanup(JobID jobId) throws IOException { checkNotNull(jobId); final File jobDir = new File( BlobUtils.getStorageLocationPath( storageDir.deref().getAbsolutePath(), jobId)); + FileUtils.deleteDirectory(jobDir); + + // NOTE on why blobExpiryTimes are not cleaned up: + // Instead of going through blobExpiryTimes, keep lingering entries - they + // will be cleaned up by the timer task which tolerates non-existing files + // If inserted again with the same IDs (via put()), the TTL will be updated + // again. Review comment: nit: ```suggestion // NOTE on why blobExpiryTimes are not cleaned up: Instead of going through blobExpiryTimes, // keep lingering entries - they will be cleaned up by the timer task which tolerates // non-existing files If inserted again with the same IDs (via put()), the TTL will be // updated again. ``` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -162,34 +162,98 @@ public Dispatcher( DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception { + this( + rpcService, + fencingToken, + recoveredJobs, + recoveredDirtyJobs, + dispatcherBootstrapFactory, + dispatcherServices, + new JobManagerRunnerRegistry(16)); + } + + private Dispatcher( Review comment: I miss how this change improves testability as we're only introducing new private constructors ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource; +import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** {@code JobManagerRunner} collects running jobs represented by {@link JobManagerRunner}. */ +public class JobManagerRunnerRegistry Review comment: Same as with Metrics, `JobManagerRunnerRegistry` should really only implement the `LocallyCleanableResource` interface as it doesn't have any global resources. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -943,17 +945,16 @@ private void cleanUpHighAvailabilityJobData(JobID jobId) { private void terminateRunningJobs() { log.info("Stopping all currently running jobs of dispatcher {}.", getAddress()); - final HashSet<JobID> jobsToRemove = new HashSet<>(runningJobs.keySet()); + final Set<JobID> jobsToRemove = jobManagerRunnerRegistry.getRunningJobIds(); Review comment: this doesn't share the same semantics as the original code as we no longer make a copy of the set. Is that an issue? If not we may want to inline this into the for loop. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.util.concurrent.FutureUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** {@code DefaultResourceCleaner} is the default implementation of {@link ResourceCleaner}. */ +public class DefaultResourceCleaner implements ResourceCleaner { + + private final Collection<BiFunction<JobID, Executor, CompletableFuture<Void>>> + prioritizedOrderedJobRelatedCleanups = new ArrayList<>(); + private final Collection<BiFunction<JobID, Executor, CompletableFuture<Void>>> + jobRelatedCleanups = new ArrayList<>(); + private final Executor cleanupExecutor; + + DefaultResourceCleaner(Executor cleanupExecutor) { Review comment: It would be good to also inject the `ComponentMainThreadExecutor` here and assert that cleanupAsync is executed in the main thread as some implementation rely on it (eg. JobManagerRunnerRegistry) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ########## @@ -162,34 +162,98 @@ public Dispatcher( DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception { + this( + rpcService, + fencingToken, + recoveredJobs, + recoveredDirtyJobs, + dispatcherBootstrapFactory, + dispatcherServices, + new JobManagerRunnerRegistry(16)); + } + + private Dispatcher( + RpcService rpcService, + DispatcherId fencingToken, + Collection<JobGraph> recoveredJobs, + Collection<JobResult> globallyTerminatedJobs, + DispatcherBootstrapFactory dispatcherBootstrapFactory, + DispatcherServices dispatcherServices, + JobManagerRunnerRegistry jobManagerRunnerRegistry) + throws Exception { + this( + rpcService, + fencingToken, + recoveredJobs, + globallyTerminatedJobs, + dispatcherServices.getConfiguration(), + dispatcherServices.getHighAvailabilityServices(), + dispatcherServices.getResourceManagerGatewayRetriever(), + dispatcherServices.getHeartbeatServices(), + dispatcherServices.getBlobServer(), + dispatcherServices.getFatalErrorHandler(), + dispatcherServices.getJobGraphWriter(), + dispatcherServices.getJobResultStore(), + dispatcherServices.getJobManagerMetricGroup(), + dispatcherServices.getMetricQueryServiceAddress(), + dispatcherServices.getIoExecutor(), + dispatcherServices.getHistoryServerArchivist(), + dispatcherServices.getArchivedExecutionGraphStore(), + dispatcherServices.getJobManagerRunnerFactory(), + dispatcherBootstrapFactory, + dispatcherServices.getOperationCaches(), + jobManagerRunnerRegistry); + } + + private Dispatcher( + RpcService rpcService, + DispatcherId fencingToken, + Collection<JobGraph> recoveredJobs, + Collection<JobResult> recoveredDirtyJobs, + Configuration configuration, + HighAvailabilityServices highAvailabilityServices, + GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, + HeartbeatServices heartbeatServices, + BlobServer blobServer, + FatalErrorHandler fatalErrorHandler, + JobGraphWriter jobGraphWriter, + JobResultStore jobResultStore, + JobManagerMetricGroup jobManagerMetricGroup, + @Nullable String metricServiceQueryAddress, + Executor ioExecutor, + HistoryServerArchivist historyServerArchivist, + ExecutionGraphInfoStore executionGraphInfoStore, + JobManagerRunnerFactory jobManagerRunnerFactory, + DispatcherBootstrapFactory dispatcherBootstrapFactory, + DispatcherOperationCaches dispatcherOperationCaches, + JobManagerRunnerRegistry jobManagerRunnerRegistry) + throws Exception { super(rpcService, RpcServiceUtils.createRandomName(DISPATCHER_NAME), fencingToken); - checkNotNull(dispatcherServices); assertRecoveredJobsAndDirtyJobResults(recoveredJobs, recoveredDirtyJobs); - this.configuration = dispatcherServices.getConfiguration(); - this.highAvailabilityServices = dispatcherServices.getHighAvailabilityServices(); - this.resourceManagerGatewayRetriever = - dispatcherServices.getResourceManagerGatewayRetriever(); - this.heartbeatServices = dispatcherServices.getHeartbeatServices(); - this.blobServer = dispatcherServices.getBlobServer(); - this.fatalErrorHandler = dispatcherServices.getFatalErrorHandler(); - this.jobGraphWriter = dispatcherServices.getJobGraphWriter(); - this.jobResultStore = dispatcherServices.getJobResultStore(); - this.jobManagerMetricGroup = dispatcherServices.getJobManagerMetricGroup(); - this.metricServiceQueryAddress = dispatcherServices.getMetricQueryServiceAddress(); - this.ioExecutor = dispatcherServices.getIoExecutor(); + this.configuration = checkNotNull(configuration); + this.highAvailabilityServices = checkNotNull(highAvailabilityServices); + this.resourceManagerGatewayRetriever = checkNotNull(resourceManagerGatewayRetriever); + this.heartbeatServices = checkNotNull(heartbeatServices); + this.blobServer = checkNotNull(blobServer); + this.fatalErrorHandler = checkNotNull(fatalErrorHandler); + this.jobGraphWriter = checkNotNull(jobGraphWriter); + this.jobResultStore = checkNotNull(jobResultStore); + this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup); + this.metricServiceQueryAddress = metricServiceQueryAddress; + this.ioExecutor = checkNotNull(ioExecutor); Review comment: I'm not really a big fan of these safeguards. - It's hard to keep them in sync when new code is added - We should be able to enforce this in compile time simply by checking annotations (eg. https://github.com/uber/NullAway) - It adds noise What was the reason for adding these checks? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java ########## @@ -169,28 +177,30 @@ public void testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed() // This will clear internal state of election service, so a new contender can register. leaderElectionService.stop(); + assertThat( + "The JobGraph is still stored in the JobGraphStore.", + haServices.getJobGraphStore().getJobIds(), + CoreMatchers.is(Collections.singleton(jobId))); + assertThat( + "The JobResultStore has this job marked as dirty.", + haServices.getJobResultStore().getDirtyResults().stream() + .map(JobResult::getJobId) + .collect(Collectors.toSet()), + CoreMatchers.is(Collections.singleton(jobId))); + // Run a second dispatcher, that restores our finished job. final Dispatcher secondDispatcher = createRecoveredDispatcher(null); toTerminate.add(secondDispatcher); - final DispatcherGateway secondDispatcherGateway = - secondDispatcher.getSelfGateway(DispatcherGateway.class); + + // new Dispatcher becomes new leader leaderElectionService.isLeader(UUID.randomUUID()); - // Now make sure that restored job started from checkpoint. - final JobMasterGateway secondJobMasterGateway = - connectToLeadingJobMaster(leaderElectionService).get(); - try (final JobMasterTester tester = - new JobMasterTester(rpcService, jobId, secondJobMasterGateway)) { - final CompletableFuture<List<TaskDeploymentDescriptor>> descriptorsFuture = - tester.deployVertices(2); - awaitStatus(secondDispatcherGateway, jobId, JobStatus.RUNNING); - final Optional<JobManagerTaskRestore> maybeRestore = - descriptorsFuture.get().stream() - .map(TaskDeploymentDescriptor::getTaskRestore) - .filter(Objects::nonNull) - .findAny(); - assertTrue("Job has recovered from checkpoint.", maybeRestore.isPresent()); - } + assertThrows( Review comment: should we simply poll the job result from the second dispatcher here? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java ########## @@ -619,34 +618,6 @@ public boolean hasCleanJobResultEntry(JobID jobId) throws IOException { } } - @Test - public void testHABlobsAreNotRemovedIfHAJobGraphRemovalFails() throws Exception { Review comment: why is this test no longer relevant? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobGraphWriter.java ########## @@ -42,5 +41,9 @@ * @param jobId specifying the job to release the locks for * @throws Exception if the locks cannot be released */ Review comment: or maybe move it to the implementation ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DispatcherResourceCleanerFactoryTest.java ########## @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.BlobStore; +import org.apache.flink.runtime.blob.TestingBlobStoreBuilder; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.dispatcher.JobManagerRunnerRegistry; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.JobResultStore; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.JobGraphStore; +import org.apache.flink.runtime.jobmanager.JobGraphWriter; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.TestingMetricRegistry; +import org.apache.flink.util.concurrent.Executors; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * {@code DispatcherResourceCleanerFactoryTest} verifies that the resources are properly cleaned up + * for both, the {@link GloballyCleanableResource} and {@link LocallyCleanableResource} of the + * {@link org.apache.flink.runtime.dispatcher.Dispatcher}. + */ +public class DispatcherResourceCleanerFactoryTest { + + private static final JobID JOB_ID = new JobID(); + + private CleanableJobManagerRegistry jobManagerRunnerRegistry; + private CleanableJobGraphWriter jobGraphWriter; + private CleanableBlobServer blobServer; + private CleanableHighAvailabilityServices highAvailabilityServices; + private JobManagerMetricGroup jobManagerMetricGroup; + + private DispatcherResourceCleanerFactory testInstance; + + @BeforeEach + public void setup() throws IOException { + jobManagerRunnerRegistry = new CleanableJobManagerRegistry(); + jobGraphWriter = new CleanableJobGraphWriter(); + blobServer = new CleanableBlobServer(); + highAvailabilityServices = new CleanableHighAvailabilityServices(); + + MetricRegistry metricRegistry = TestingMetricRegistry.builder().build(); + jobManagerMetricGroup = + JobManagerMetricGroup.createJobManagerMetricGroup( + metricRegistry, "ignored hostname"); + jobManagerMetricGroup.addJob(JOB_ID, "ignored job name"); + + testInstance = + new DispatcherResourceCleanerFactory( + Executors.directExecutor(), + jobManagerRunnerRegistry, + jobGraphWriter, + blobServer, + highAvailabilityServices, + jobManagerMetricGroup); + } + + @Test + public void testLocalResourceCleaning() { + assertGlobalCleanupNotTriggered(); + assertLocalCleanupNotTriggered(); + assertJobManagerMetricGroupNotCleaned(); + + final CompletableFuture<Void> cleanupResultFuture = + testInstance.createLocalResourceCleaner().cleanupAsync(JOB_ID); + + assertGlobalCleanupNotTriggered(); + assertLocalCleanupTriggeredWaitingForJobManagerRunnerRegistry(); + assertJobManagerMetricGroupNotCleaned(); + + assertThat(cleanupResultFuture).isNotCompleted(); + + jobManagerRunnerRegistry.completeLocalCleanup(); + + assertGlobalCleanupNotTriggered(); + assertLocalCleanupTriggered(); + assertJobManagerMetricGroupCleaned(); + + assertThat(cleanupResultFuture).isCompleted(); + } + + @Test + public void testGlobalResourceCleaning() + throws ExecutionException, InterruptedException, TimeoutException { + assertGlobalCleanupNotTriggered(); + assertLocalCleanupNotTriggered(); + assertJobManagerMetricGroupNotCleaned(); + + final CompletableFuture<Void> cleanupResultFuture = + testInstance.createGlobalResourceCleaner().cleanupAsync(JOB_ID); + + assertGlobalCleanupTriggeredWaitingForJobManagerRunnerRegistry(); + assertLocalCleanupNotTriggered(); + assertJobManagerMetricGroupNotCleaned(); + + assertThat(cleanupResultFuture).isNotCompleted(); + + jobManagerRunnerRegistry.completeGlobalCleanup(); + + assertGlobalCleanupTriggered(); + assertLocalCleanupNotTriggered(); + assertJobManagerMetricGroupCleaned(); + + assertThat(cleanupResultFuture).isCompleted(); + } + + private void assertLocalCleanupNotTriggered() { + assertThat(jobManagerRunnerRegistry.getLocalCleanupFuture()).isNotDone(); + assertThat(jobGraphWriter.getLocalCleanupFuture()).isNotDone(); + assertThat(blobServer.getLocalCleanupFuture()).isNotDone(); + assertThat(highAvailabilityServices.getLocalCleanupFuture()).isNotDone(); + } + + private void assertLocalCleanupTriggeredWaitingForJobManagerRunnerRegistry() { + assertThat(jobManagerRunnerRegistry.getLocalCleanupFuture()).isDone(); + + // the JobManagerRunnerRegistry needs to be cleaned up first + assertThat(jobGraphWriter.getLocalCleanupFuture()).isNotDone(); + assertThat(blobServer.getLocalCleanupFuture()).isNotDone(); + assertThat(highAvailabilityServices.getLocalCleanupFuture()).isNotDone(); + } + + private void assertGlobalCleanupNotTriggered() { + assertThat(jobManagerRunnerRegistry.getGlobalCleanupFuture()).isNotDone(); + assertThat(jobGraphWriter.getGlobalCleanupFuture()).isNotDone(); + assertThat(blobServer.getGlobalCleanupFuture()).isNotDone(); + assertThat(highAvailabilityServices.getGlobalCleanupFuture()).isNotDone(); + } + + private void assertGlobalCleanupTriggeredWaitingForJobManagerRunnerRegistry() { + assertThat(jobManagerRunnerRegistry.getGlobalCleanupFuture()).isDone(); + + // the JobManagerRunnerRegistry needs to be cleaned up first + assertThat(jobGraphWriter.getGlobalCleanupFuture()).isNotDone(); + assertThat(blobServer.getGlobalCleanupFuture()).isNotDone(); + assertThat(highAvailabilityServices.getGlobalCleanupFuture()).isNotDone(); + } + + private void assertJobManagerMetricGroupNotCleaned() { + assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(1); + } + + private void assertLocalCleanupTriggered() { + assertThat(jobManagerRunnerRegistry.getLocalCleanupFuture()).isCompleted(); + assertThat(jobGraphWriter.getLocalCleanupFuture()).isCompleted(); + assertThat(blobServer.getLocalCleanupFuture()).isCompleted(); + assertThat(highAvailabilityServices.getLocalCleanupFuture()).isCompleted(); + } + + private void assertGlobalCleanupTriggered() { + assertThat(jobManagerRunnerRegistry.getGlobalCleanupFuture()).isCompleted(); + assertThat(jobGraphWriter.getGlobalCleanupFuture()).isCompleted(); + assertThat(blobServer.getGlobalCleanupFuture()).isCompleted(); + assertThat(highAvailabilityServices.getGlobalCleanupFuture()).isCompleted(); + } + + private void assertJobManagerMetricGroupCleaned() { + assertThat(jobManagerMetricGroup.numRegisteredJobMetricGroups()).isEqualTo(0); + } + + private static class AbstractTestingCleanableResource + implements LocallyCleanableResource, GloballyCleanableResource { + + private final CompletableFuture<JobID> localCleanupFuture = new CompletableFuture<>(); + private final CompletableFuture<JobID> globalCleanupFuture = new CompletableFuture<>(); + + @Override + public void globalCleanup(JobID jobId) { + throw new UnsupportedOperationException("Synchronous globalCleanup is not supported."); + } + + @Override + public void localCleanup(JobID jobId) { + throw new UnsupportedOperationException("Synchronous localCleanup is not supported."); + } + + @Override + public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor ignoredExecutor) { + localCleanupFuture.complete(jobId); + + return FutureUtils.completedVoidFuture(); + } + + @Override + public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor ignoredExecutor) { + globalCleanupFuture.complete(jobId); + + return FutureUtils.completedVoidFuture(); + } + + public CompletableFuture<JobID> getLocalCleanupFuture() { + return localCleanupFuture; + } + + public CompletableFuture<JobID> getGlobalCleanupFuture() { + return globalCleanupFuture; + } + } + + private static class CleanableJobGraphWriter extends AbstractTestingCleanableResource + implements JobGraphWriter { + + @Override + public void putJobGraph(JobGraph jobGraph) { + throw new UnsupportedOperationException("putJobGraph operation not supported."); + } + } + + private static class CleanableHighAvailabilityServices extends AbstractTestingCleanableResource Review comment: Why can't we reuse `TestingHighAvailabilityServices` here? Same applies to other classes in this test. The reasoning behind this is very same as with using `mockito`, if you decide to change something in the interface you have to go trough all implementations one by one. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/GloballyCleanableResource.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher.cleanup; + +import org.apache.flink.api.common.JobID; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +/** + * {@code GloballyCleanableResource} is supposed to be used by any class that provides artifacts for + * a given job that can be cleaned up globally. + * + * @see LocallyCleanableResource + */ +@FunctionalInterface +public interface GloballyCleanableResource { Review comment: also we should add some notes about thread safety guarantees (sync method needs to be thread safe) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
