This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6f798996dad0e41ced3e9e293ee81d0caf225874 Author: Matthias Pohl <[email protected]> AuthorDate: Wed Dec 15 14:54:27 2021 +0100 [FLINK-25432][runtime] Adds JobManagerRunnerRegistry and integrates it into the Dispatcher --- .../DefaultJobManagerRunnerRegistry.java | 127 ++++++++++++ .../flink/runtime/dispatcher/Dispatcher.java | 65 +++--- .../dispatcher/JobManagerRunnerRegistry.java | 68 ++++++ .../DefaultJobManagerRunnerRegistryTest.java | 230 +++++++++++++++++++++ .../TestingJobManagerRunnerRegistry.java | 199 ++++++++++++++++++ .../runtime/jobmaster/TestingJobManagerRunner.java | 6 +- 6 files changed, 664 insertions(+), 31 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java new file mode 100644 index 0000000..2ba54e9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistry.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + +/** + * {@code DefaultJobManagerRunnerRegistry} is the default implementation of the {@link + * JobManagerRunnerRegistry} interface. All methods of this class are expected to be called from + * within the main thread. + */ +public class DefaultJobManagerRunnerRegistry implements JobManagerRunnerRegistry { + + @VisibleForTesting final Map<JobID, JobManagerRunner> jobManagerRunners; + private final ComponentMainThreadExecutor mainThreadExecutor; + + public DefaultJobManagerRunnerRegistry( + int initialCapacity, ComponentMainThreadExecutor mainThreadExecutor) { + Preconditions.checkArgument(initialCapacity > 0); + jobManagerRunners = new HashMap<>(initialCapacity); + this.mainThreadExecutor = mainThreadExecutor; + } + + @Override + public boolean isRegistered(JobID jobId) { + return jobManagerRunners.containsKey(jobId); + } + + @Override + public void register(JobManagerRunner jobManagerRunner) { + mainThreadExecutor.assertRunningInMainThread(); + Preconditions.checkArgument( + !isRegistered(jobManagerRunner.getJobID()), + "A job with the ID %s is already registered.", + jobManagerRunner.getJobID()); + this.jobManagerRunners.put(jobManagerRunner.getJobID(), jobManagerRunner); + } + + @Override + public JobManagerRunner get(JobID jobId) { + assertJobRegistered(jobId); + return this.jobManagerRunners.get(jobId); + } + + @Override + public int size() { + return this.jobManagerRunners.size(); + } + + @Override + public Set<JobID> getRunningJobIds() { + return new HashSet<>(this.jobManagerRunners.keySet()); + } + + @Override + public Collection<JobManagerRunner> getJobManagerRunners() { + return new ArrayList<>(this.jobManagerRunners.values()); + } + + @Override + public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor unusedExecutor) { + return cleanup(jobId); + } + + @Override + public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor unusedExecutor) { + return cleanup(jobId); + } + + private CompletableFuture<Void> cleanup(JobID jobId) { + mainThreadExecutor.assertRunningInMainThread(); + if (isRegistered(jobId)) { + try { + unregister(jobId).close(); + } catch (Exception e) { + return FutureUtils.completedExceptionally(e); + } + } + + return FutureUtils.completedVoidFuture(); + } + + @Override + public JobManagerRunner unregister(JobID jobId) { + mainThreadExecutor.assertRunningInMainThread(); + assertJobRegistered(jobId); + return this.jobManagerRunners.remove(jobId); + } + + private void assertJobRegistered(JobID jobId) { + if (!isRegistered(jobId)) { + throw new NoSuchElementException( + "There is no running job registered for the job ID " + jobId); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 6599115..88f9237 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -123,7 +123,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher private final FatalErrorHandler fatalErrorHandler; - private final Map<JobID, JobManagerRunner> runningJobs; + private final JobManagerRunnerRegistry jobManagerRunnerRegistry; private final Collection<JobGraph> recoveredJobs; @@ -184,7 +184,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher JobManagerSharedServices.fromConfiguration( configuration, blobServer, fatalErrorHandler); - runningJobs = new HashMap<>(16); + jobManagerRunnerRegistry = + new DefaultJobManagerRunnerRegistry(16, this.getMainThreadExecutor()); this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist(); @@ -385,7 +386,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher * @throws FlinkException if the job scheduling status cannot be retrieved */ private boolean isDuplicateJob(JobID jobId) throws FlinkException { - return isInGloballyTerminalState(jobId) || runningJobs.containsKey(jobId); + return isInGloballyTerminalState(jobId) || jobManagerRunnerRegistry.isRegistered(jobId); } /** @@ -458,12 +459,12 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher } private void runJob(JobGraph jobGraph, ExecutionType executionType) throws Exception { - Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID())); + Preconditions.checkState(!jobManagerRunnerRegistry.isRegistered(jobGraph.getJobID())); long initializationTimestamp = System.currentTimeMillis(); JobManagerRunner jobManagerRunner = createJobManagerRunner(jobGraph, initializationTimestamp); - runningJobs.put(jobGraph.getJobID(), jobManagerRunner); + jobManagerRunnerRegistry.register(jobManagerRunner); final JobID jobId = jobGraph.getJobID(); @@ -473,7 +474,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher .handleAsync( (jobManagerRunnerResult, throwable) -> { Preconditions.checkState( - runningJobs.get(jobId) == jobManagerRunner, + jobManagerRunnerRegistry.isRegistered(jobId) + && jobManagerRunnerRegistry.get(jobId) + == jobManagerRunner, "The job entry in runningJobs must be bound to the lifetime of the JobManagerRunner."); if (jobManagerRunnerResult != null) { @@ -545,7 +548,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher @Override public CompletableFuture<Collection<JobID>> listJobs(Time timeout) { return CompletableFuture.completedFuture( - Collections.unmodifiableSet(new HashSet<>(runningJobs.keySet()))); + Collections.unmodifiableSet(jobManagerRunnerRegistry.getRunningJobIds())); } @Override @@ -690,9 +693,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher @Override public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) { - JobManagerRunner job = runningJobs.get(jobId); - - if (job == null) { + if (!jobManagerRunnerRegistry.isRegistered(jobId)) { final ExecutionGraphInfo executionGraphInfo = executionGraphInfoStore.get(jobId); if (executionGraphInfo == null) { @@ -701,15 +702,17 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher return CompletableFuture.completedFuture( JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph())); } - } else { - return job.getResultFuture() - .thenApply( - jobManagerRunnerResult -> - JobResult.createFrom( - jobManagerRunnerResult - .getExecutionGraphInfo() - .getArchivedExecutionGraph())); } + + final JobManagerRunner jobManagerRunner = jobManagerRunnerRegistry.get(jobId); + return jobManagerRunner + .getResultFuture() + .thenApply( + jobManagerRunnerResult -> + JobResult.createFrom( + jobManagerRunnerResult + .getExecutionGraphInfo() + .getArchivedExecutionGraph())); } @Override @@ -856,7 +859,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher } private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState cleanupJobState) { - final JobManagerRunner job = checkNotNull(runningJobs.remove(jobId)); + final JobManagerRunner job = checkNotNull(jobManagerRunnerRegistry.unregister(jobId)); return CompletableFuture.supplyAsync( () -> cleanUpJobGraph(jobId, cleanupJobState.cleanupHAData), ioExecutor) .thenCompose( @@ -958,7 +961,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher private void terminateRunningJobs() { log.info("Stopping all currently running jobs of dispatcher {}.", getAddress()); - final HashSet<JobID> jobsToRemove = new HashSet<>(runningJobs.keySet()); + final Set<JobID> jobsToRemove = jobManagerRunnerRegistry.getRunningJobIds(); for (JobID jobId : jobsToRemove) { terminateJob(jobId); @@ -966,9 +969,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher } private void terminateJob(JobID jobId) { - final JobManagerRunner jobManagerRunner = runningJobs.get(jobId); - - if (jobManagerRunner != null) { + if (jobManagerRunnerRegistry.isRegistered(jobId)) { + final JobManagerRunner jobManagerRunner = jobManagerRunnerRegistry.get(jobId); jobManagerRunner.closeAsync(); } } @@ -1083,11 +1085,11 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher /** Ensures that the JobMasterGateway is available. */ private CompletableFuture<JobMasterGateway> getJobMasterGateway(JobID jobId) { - JobManagerRunner job = runningJobs.get(jobId); - if (job == null) { + if (!jobManagerRunnerRegistry.isRegistered(jobId)) { return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); } + final JobManagerRunner job = jobManagerRunnerRegistry.get(jobId); if (!job.isInitialized()) { return FutureUtils.completedExceptionally( new UnavailableDispatcherOperationException( @@ -1107,7 +1109,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher } private Optional<JobManagerRunner> getJobManagerRunner(JobID jobId) { - return Optional.ofNullable(runningJobs.get(jobId)); + return jobManagerRunnerRegistry.isRegistered(jobId) + ? Optional.of(jobManagerRunnerRegistry.get(jobId)) + : Optional.empty(); } private <T> CompletableFuture<T> runResourceManagerCommand( @@ -1129,9 +1133,9 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher Function<JobManagerRunner, CompletableFuture<T>> queryFunction) { List<CompletableFuture<Optional<T>>> optionalJobInformation = - new ArrayList<>(runningJobs.size()); + new ArrayList<>(jobManagerRunnerRegistry.size()); - for (JobManagerRunner job : runningJobs.values()) { + for (JobManagerRunner job : jobManagerRunnerRegistry.getJobManagerRunners()) { final CompletableFuture<Optional<T>> queryResult = queryFunction .apply(job) @@ -1165,7 +1169,7 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher } CompletableFuture<Void> getJobTerminationFuture(JobID jobId) { - if (runningJobs.containsKey(jobId)) { + if (jobManagerRunnerRegistry.isRegistered(jobId)) { return FutureUtils.completedExceptionally( new DispatcherException( String.format("Job with job id %s is still running.", jobId))); @@ -1176,7 +1180,8 @@ public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<Dispatcher } private void registerDispatcherMetrics(MetricGroup jobManagerMetricGroup) { - jobManagerMetricGroup.gauge(MetricNames.NUM_RUNNING_JOBS, () -> (long) runningJobs.size()); + jobManagerMetricGroup.gauge( + MetricNames.NUM_RUNNING_JOBS, () -> (long) jobManagerRunnerRegistry.size()); } public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java new file mode 100644 index 0000000..dfd342c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobManagerRunnerRegistry.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.cleanup.GloballyCleanableResource; +import org.apache.flink.runtime.dispatcher.cleanup.LocallyCleanableResource; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; + +import java.util.Collection; +import java.util.NoSuchElementException; +import java.util.Set; + +/** {@code JobManagerRunner} collects running jobs represented by {@link JobManagerRunner}. */ +public interface JobManagerRunnerRegistry + extends LocallyCleanableResource, GloballyCleanableResource { + + /** + * Checks whether a {@link JobManagerRunner} is registered under the given {@link JobID}. + * + * @param jobId The {@code JobID} to check. + * @return {@code true}, if a {@code JobManagerRunner} is registered; {@code false} otherwise. + */ + boolean isRegistered(JobID jobId); + + /** Registers the given {@link JobManagerRunner} instance. */ + void register(JobManagerRunner jobManagerRunner); + + /** + * Returns the {@link JobManagerRunner} for the given {@code JobID}. + * + * @throws NoSuchElementException if the passed {@code JobID} does not belong to a registered + * {@code JobManagerRunner}. + * @see #isRegistered(JobID) + */ + JobManagerRunner get(JobID jobId); + + /** Returns the number of {@link JobManagerRunner} instances currently being registered. */ + int size(); + + /** Returns {@link JobID} instances of registered {@link JobManagerRunner} instances. */ + Set<JobID> getRunningJobIds(); + + /** Returns the registered {@link JobManagerRunner} instances. */ + Collection<JobManagerRunner> getJobManagerRunners(); + + /** + * Unregistered the {@link JobManagerRunner} with the given {@code JobID}. {@code null} is + * returned if there's no {@code JobManagerRunner} registered for the given {@link JobID}. + */ + JobManagerRunner unregister(JobID jobId); +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java new file mode 100644 index 0000000..6eca873 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerRegistryTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.testutils.FlinkAssertions; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.concurrent.Executors; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * {@code DefaultJobManagerRunnerRegistryTest} tests the functionality of {@link + * DefaultJobManagerRunnerRegistry}. + */ +public class DefaultJobManagerRunnerRegistryTest { + + private JobManagerRunnerRegistry testInstance; + + @BeforeEach + public void setup() { + testInstance = + new DefaultJobManagerRunnerRegistry( + 4, ComponentMainThreadExecutorServiceAdapter.forMainThread()); + } + + @Test + public void testIsRegistered() { + final JobID jobId = new JobID(); + testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId).build()); + assertThat(testInstance.isRegistered(jobId)).isTrue(); + } + + @Test + public void testIsNotRegistered() { + assertThat(testInstance.isRegistered(new JobID())).isFalse(); + } + + @Test + public void testRegister() { + final JobID jobId = new JobID(); + testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId).build()); + assertThat(testInstance.isRegistered(jobId)).isTrue(); + } + + @Test + public void testRegisteringTwiceCausesFailure() { + final JobID jobId = new JobID(); + testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId).build()); + assertThat(testInstance.isRegistered(jobId)).isTrue(); + + assertThatThrownBy( + () -> + testInstance.register( + TestingJobManagerRunner.newBuilder() + .setJobId(jobId) + .build())) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testGet() { + final JobID jobId = new JobID(); + final JobManagerRunner jobManagerRunner = + TestingJobManagerRunner.newBuilder().setJobId(jobId).build(); + testInstance.register(jobManagerRunner); + + assertThat(testInstance.get(jobId)).isEqualTo(jobManagerRunner); + } + + @Test + public void testGetOnNonExistingJobManagerRunner() { + assertThatThrownBy(() -> testInstance.get(new JobID())) + .isInstanceOf(NoSuchElementException.class); + } + + @Test + public void size() { + assertThat(testInstance.size()).isEqualTo(0); + testInstance.register(TestingJobManagerRunner.newBuilder().build()); + assertThat(testInstance.size()).isEqualTo(1); + testInstance.register(TestingJobManagerRunner.newBuilder().build()); + assertThat(testInstance.size()).isEqualTo(2); + } + + @Test + public void testGetRunningJobIds() { + assertThat(testInstance.getRunningJobIds()).isEmpty(); + + final JobID jobId0 = new JobID(); + final JobID jobId1 = new JobID(); + testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId0).build()); + testInstance.register(TestingJobManagerRunner.newBuilder().setJobId(jobId1).build()); + + assertThat(testInstance.getRunningJobIds()).containsExactlyInAnyOrder(jobId0, jobId1); + } + + @Test + public void testGetJobManagerRunners() { + assertThat(testInstance.getJobManagerRunners()).isEmpty(); + + final JobManagerRunner jobManagerRunner0 = TestingJobManagerRunner.newBuilder().build(); + final JobManagerRunner jobManagerRunner1 = TestingJobManagerRunner.newBuilder().build(); + testInstance.register(jobManagerRunner0); + testInstance.register(jobManagerRunner1); + + assertThat(testInstance.getJobManagerRunners()) + .containsExactlyInAnyOrder(jobManagerRunner0, jobManagerRunner1); + } + + @Test + public void testSuccessfulLocalCleanup() throws Throwable { + final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner(); + + assertThat( + testInstance.localCleanupAsync( + jobManagerRunner.getJobID(), Executors.directExecutor())) + .isCompleted(); + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + assertThat(jobManagerRunner.getTerminationFuture()).isCompleted(); + } + + @Test + public void testFailingLocalCleanup() { + final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner(); + + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue(); + assertThat(jobManagerRunner.getTerminationFuture()).isNotDone(); + + final RuntimeException expectedException = new RuntimeException("Expected exception"); + jobManagerRunner.completeTerminationFutureExceptionally(expectedException); + + assertThat( + testInstance.localCleanupAsync( + jobManagerRunner.getJobID(), Executors.directExecutor())) + .failsWithin(Duration.ZERO) + .withThrowableOfType(ExecutionException.class) + .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE) + .hasExactlyElementsOfTypes( + ExecutionException.class, + FlinkException.class, + expectedException.getClass()) + .last() + .isEqualTo(expectedException); + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + } + + @Test + public void testSuccessfulLocalCleanupAsync() throws Exception { + final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner(); + + final CompletableFuture<Void> cleanupResult = + testInstance.localCleanupAsync( + jobManagerRunner.getJobID(), Executors.directExecutor()); + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + assertThat(cleanupResult).isCompleted(); + } + + @Test + public void testFailingLocalCleanupAsync() throws Exception { + final TestingJobManagerRunner jobManagerRunner = registerTestingJobManagerRunner(); + + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue(); + assertThat(jobManagerRunner.getTerminationFuture()).isNotDone(); + + final RuntimeException expectedException = new RuntimeException("Expected exception"); + jobManagerRunner.completeTerminationFutureExceptionally(expectedException); + + final CompletableFuture<Void> cleanupResult = + testInstance.localCleanupAsync( + jobManagerRunner.getJobID(), Executors.directExecutor()); + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isFalse(); + assertThat(cleanupResult) + .isCompletedExceptionally() + .failsWithin(Duration.ZERO) + .withThrowableOfType(ExecutionException.class) + .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE) + .hasExactlyElementsOfTypes( + ExecutionException.class, + FlinkException.class, + expectedException.getClass()) + .last() + .isEqualTo(expectedException); + } + + private TestingJobManagerRunner registerTestingJobManagerRunner() { + final TestingJobManagerRunner jobManagerRunner = + TestingJobManagerRunner.newBuilder().build(); + testInstance.register(jobManagerRunner); + + assertThat(testInstance.isRegistered(jobManagerRunner.getJobID())).isTrue(); + assertThat(jobManagerRunner.getTerminationFuture()).isNotDone(); + + return jobManagerRunner; + } + + @Test + public void testLocalCleanupAsyncOnUnknownJobId() { + assertThat(testInstance.localCleanupAsync(new JobID(), Executors.directExecutor())) + .isCompleted(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java new file mode 100644 index 0000000..c9e60df --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/TestingJobManagerRunnerRegistry.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JobManagerRunner; +import org.apache.flink.util.concurrent.FutureUtils; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * {@code TestingJobManagerRunnerRegistry} is a test implementation of {@link + * JobManagerRunnerRegistry}. + */ +public class TestingJobManagerRunnerRegistry implements JobManagerRunnerRegistry { + + private final Function<JobID, Boolean> isRegisteredFunction; + private final Consumer<JobManagerRunner> registerConsumer; + private final Function<JobID, JobManagerRunner> getFunction; + private final Supplier<Integer> sizeSupplier; + private final Supplier<Set<JobID>> getRunningJobIdsSupplier; + private final Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier; + private final Function<JobID, JobManagerRunner> unregisterFunction; + private final BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction; + private final BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction; + + private TestingJobManagerRunnerRegistry( + Function<JobID, Boolean> isRegisteredFunction, + Consumer<JobManagerRunner> registerConsumer, + Function<JobID, JobManagerRunner> getFunction, + Supplier<Integer> sizeSupplier, + Supplier<Set<JobID>> getRunningJobIdsSupplier, + Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier, + Function<JobID, JobManagerRunner> unregisterFunction, + BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction, + BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction) { + this.isRegisteredFunction = isRegisteredFunction; + this.registerConsumer = registerConsumer; + this.getFunction = getFunction; + this.sizeSupplier = sizeSupplier; + this.getRunningJobIdsSupplier = getRunningJobIdsSupplier; + this.getJobManagerRunnersSupplier = getJobManagerRunnersSupplier; + this.unregisterFunction = unregisterFunction; + this.localCleanupAsyncFunction = localCleanupAsyncFunction; + this.globalCleanupAsyncFunction = globalCleanupAsyncFunction; + } + + @Override + public boolean isRegistered(JobID jobId) { + return isRegisteredFunction.apply(jobId); + } + + @Override + public void register(JobManagerRunner jobManagerRunner) { + registerConsumer.accept(jobManagerRunner); + } + + @Override + public JobManagerRunner get(JobID jobId) { + return getFunction.apply(jobId); + } + + @Override + public int size() { + return sizeSupplier.get(); + } + + @Override + public Set<JobID> getRunningJobIds() { + return getRunningJobIdsSupplier.get(); + } + + @Override + public Collection<JobManagerRunner> getJobManagerRunners() { + return getJobManagerRunnersSupplier.get(); + } + + @Override + public JobManagerRunner unregister(JobID jobId) { + return unregisterFunction.apply(jobId); + } + + @Override + public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) { + return localCleanupAsyncFunction.apply(jobId, executor); + } + + @Override + public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) { + return globalCleanupAsyncFunction.apply(jobId, executor); + } + + public static Builder builder() { + return new Builder(); + } + + /** {@code Builder} for creating {@code TestingJobManagerRunnerRegistry} instances. */ + public static class Builder { + + private Function<JobID, Boolean> isRegisteredFunction = ignoredJobId -> true; + private Consumer<JobManagerRunner> registerConsumer = ignoredRunner -> {}; + private Function<JobID, JobManagerRunner> getFunction = ignoredJobId -> null; + private Supplier<Integer> sizeSupplier = () -> 0; + private Supplier<Set<JobID>> getRunningJobIdsSupplier = Collections::emptySet; + private Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier = + Collections::emptyList; + private Function<JobID, JobManagerRunner> unregisterFunction = ignoredJobId -> null; + private BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction = + (ignoredJobId, ignoredExecutor) -> FutureUtils.completedVoidFuture(); + private BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction = + (ignoredJobId, ignoredExecutor) -> FutureUtils.completedVoidFuture(); + + public Builder withIsRegisteredFunction(Function<JobID, Boolean> isRegisteredFunction) { + this.isRegisteredFunction = isRegisteredFunction; + return this; + } + + public Builder withRegisterConsumer(Consumer<JobManagerRunner> registerConsumer) { + this.registerConsumer = registerConsumer; + return this; + } + + public Builder withGetFunction(Function<JobID, JobManagerRunner> getFunction) { + this.getFunction = getFunction; + return this; + } + + public Builder withSizeSupplier(Supplier<Integer> sizeSupplier) { + this.sizeSupplier = sizeSupplier; + return this; + } + + public Builder withGetRunningJobIdsSupplier(Supplier<Set<JobID>> getRunningJobIdsSupplier) { + this.getRunningJobIdsSupplier = getRunningJobIdsSupplier; + return this; + } + + public Builder withGetJobManagerRunnersSupplier( + Supplier<Collection<JobManagerRunner>> getJobManagerRunnersSupplier) { + this.getJobManagerRunnersSupplier = getJobManagerRunnersSupplier; + return this; + } + + public Builder withUnregisterFunction( + Function<JobID, JobManagerRunner> unregisterFunction) { + this.unregisterFunction = unregisterFunction; + return this; + } + + public Builder withLocalCleanupAsyncFunction( + BiFunction<JobID, Executor, CompletableFuture<Void>> localCleanupAsyncFunction) { + this.localCleanupAsyncFunction = localCleanupAsyncFunction; + return this; + } + + public Builder withGlobalCleanupAsyncFunction( + BiFunction<JobID, Executor, CompletableFuture<Void>> globalCleanupAsyncFunction) { + this.globalCleanupAsyncFunction = globalCleanupAsyncFunction; + return this; + } + + public TestingJobManagerRunnerRegistry build() { + return new TestingJobManagerRunnerRegistry( + isRegisteredFunction, + registerConsumer, + getFunction, + sizeSupplier, + getRunningJobIdsSupplier, + getJobManagerRunnersSupplier, + unregisterFunction, + localCleanupAsyncFunction, + globalCleanupAsyncFunction); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java index 0c0994d..7b6adf5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java @@ -155,6 +155,10 @@ public class TestingJobManagerRunner implements JobManagerRunner { terminationFuture.complete(null); } + public void completeTerminationFutureExceptionally(Throwable expectedException) { + terminationFuture.completeExceptionally(expectedException); + } + public CompletableFuture<Void> getTerminationFuture() { return terminationFuture; } @@ -166,7 +170,7 @@ public class TestingJobManagerRunner implements JobManagerRunner { /** {@code Builder} for instantiating {@link TestingJobManagerRunner} instances. */ public static class Builder { - private JobID jobId = null; + private JobID jobId = new JobID(); private boolean blockingTermination = false; private CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = new CompletableFuture<>();
