This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 347becbe43209fb9c65bcc8ae3859a071469e587 Author: Zhu Zhu <[email protected]> AuthorDate: Fri Oct 15 11:52:37 2021 +0800 [FLINK-19142][runtime] Fix slot hijacking after task failover This closes #15229. --- .../flink/runtime/scheduler/DefaultScheduler.java | 49 ++++++ .../scheduler/ExecutionSlotAllocationContext.java | 11 ++ .../MergingSharedSlotProfileRetrieverFactory.java | 31 ++-- .../SlotSharingExecutionSlotAllocatorFactory.java | 4 +- .../MergingSharedSlotProfileRetrieverTest.java | 37 ++++- .../TestExecutionSlotAllocatorFactory.java | 7 + .../DefaultSchedulerLocalRecoveryITCase.java | 181 +++++++++++++++++++++ 7 files changed, 296 insertions(+), 24 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index 9678786..fdfa238 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -63,6 +63,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -104,6 +105,13 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio private final Time rpcTimeout; + private final Map<AllocationID, Long> reservedAllocationRefCounters; + + // once an execution vertex is assigned an allocation/slot, it will reserve the allocation + // until it is assigned a new allocation, or it finishes and does not need the allocation + // anymore. The reserved allocation information is needed for local recovery. + private final Map<ExecutionVertexID, AllocationID> reservedAllocationByExecutionVertex; + DefaultScheduler( final Logger log, final JobGraph jobGraph, @@ -152,6 +160,9 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio this.shuffleMaster = checkNotNull(shuffleMaster); this.rpcTimeout = checkNotNull(rpcTimeout); + this.reservedAllocationRefCounters = new HashMap<>(); + this.reservedAllocationByExecutionVertex = new HashMap<>(); + final FailoverStrategy failoverStrategy = failoverStrategyFactory.create( getSchedulingTopology(), getResultPartitionAvailabilityChecker()); @@ -207,6 +218,16 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio final ExecutionVertexID executionVertexId, final TaskExecutionStateTransition taskExecutionState) { + // once a task finishes, it will release the assigned allocation/slot and no longer + // needs it. Therefore, it should stop reserving the slot so that other tasks are + // possible to use the slot. Ideally, the `stopReserveAllocation` should happen + // along with the release slot process. However, that process is hidden in the depth + // of the ExecutionGraph, so we currently do it in DefaultScheduler after that process + // is done. + if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) { + stopReserveAllocation(executionVertexId); + } + schedulingStrategy.onExecutionStateChange( executionVertexId, taskExecutionState.getExecutionState()); maybeHandleTaskFailure(taskExecutionState, executionVertexId); @@ -520,10 +541,33 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); executionVertex.tryAssignResource(logicalSlot); + + startReserveAllocation(executionVertexId, logicalSlot.getAllocationId()); + return logicalSlot; }; } + private void startReserveAllocation( + ExecutionVertexID executionVertexId, AllocationID newAllocation) { + + // stop the previous allocation reservation if there is one + stopReserveAllocation(executionVertexId); + + reservedAllocationByExecutionVertex.put(executionVertexId, newAllocation); + reservedAllocationRefCounters.compute( + newAllocation, (ignored, oldCount) -> oldCount == null ? 1 : oldCount + 1); + } + + private void stopReserveAllocation(ExecutionVertexID executionVertexId) { + final AllocationID priorAllocation = + reservedAllocationByExecutionVertex.remove(executionVertexId); + if (priorAllocation != null) { + reservedAllocationRefCounters.compute( + priorAllocation, (ignored, oldCount) -> oldCount > 1 ? oldCount - 1 : null); + } + } + private Function<LogicalSlot, CompletableFuture<Void>> registerProducedPartitions( final DeploymentHandle deploymentHandle) { final ExecutionVertexID executionVertexId = deploymentHandle.getExecutionVertexId(); @@ -672,6 +716,11 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio public Optional<TaskManagerLocation> getStateLocation(ExecutionVertexID executionVertexId) { return stateLocationRetriever.getStateLocation(executionVertexId); } + + @Override + public Set<AllocationID> getReservedAllocations() { + return reservedAllocationRefCounters.keySet(); + } } private void enrichResourceProfile() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java index b089a35..0320fac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.scheduler; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; @@ -67,4 +68,14 @@ interface ExecutionSlotAllocationContext extends InputsLocationsRetriever, State * @return all co-location groups in the job */ Set<CoLocationGroup> getCoLocationGroups(); + + /** + * Returns all reserved allocations. These allocations/slots were used to run certain vertices + * and reserving them can prevent other vertices to take these slots and thus help vertices to + * be deployed into their previous slots again after failover. It is needed if {@link + * CheckpointingOptions#LOCAL_RECOVERY} is enabled. + * + * @return all reserved allocations + */ + Set<AllocationID> getReservedAllocations(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java index 42ed8e0..1fbde93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java @@ -28,10 +28,9 @@ import org.apache.flink.util.Preconditions; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; -import java.util.Objects; import java.util.Set; import java.util.function.Function; -import java.util.stream.Collectors; +import java.util.function.Supplier; /** Factory for {@link MergingSharedSlotProfileRetriever}. */ class MergingSharedSlotProfileRetrieverFactory @@ -40,21 +39,21 @@ class MergingSharedSlotProfileRetrieverFactory private final Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever; + private final Supplier<Set<AllocationID>> reservedAllocationIdsRetriever; + MergingSharedSlotProfileRetrieverFactory( SyncPreferredLocationsRetriever preferredLocationsRetriever, - Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever) { + Function<ExecutionVertexID, AllocationID> priorAllocationIdRetriever, + Supplier<Set<AllocationID>> reservedAllocationIdsRetriever) { this.preferredLocationsRetriever = Preconditions.checkNotNull(preferredLocationsRetriever); this.priorAllocationIdRetriever = Preconditions.checkNotNull(priorAllocationIdRetriever); + this.reservedAllocationIdsRetriever = + Preconditions.checkNotNull(reservedAllocationIdsRetriever); } @Override public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID> bulk) { - Set<AllocationID> allPriorAllocationIds = - bulk.stream() - .map(priorAllocationIdRetriever) - .filter(Objects::nonNull) - .collect(Collectors.toSet()); - return new MergingSharedSlotProfileRetriever(allPriorAllocationIds, bulk); + return new MergingSharedSlotProfileRetriever(reservedAllocationIdsRetriever.get(), bulk); } /** @@ -62,16 +61,15 @@ class MergingSharedSlotProfileRetrieverFactory * schedule. */ private class MergingSharedSlotProfileRetriever implements SharedSlotProfileRetriever { - /** All previous {@link AllocationID}s of the bulk to schedule. */ - private final Set<AllocationID> allBulkPriorAllocationIds; + /** All reserved {@link AllocationID}s of the job. */ + private final Set<AllocationID> reservedAllocationIds; /** All {@link ExecutionVertexID}s of the bulk. */ private final Set<ExecutionVertexID> producersToIgnore; private MergingSharedSlotProfileRetriever( - Set<AllocationID> allBulkPriorAllocationIds, - Set<ExecutionVertexID> producersToIgnore) { - this.allBulkPriorAllocationIds = Preconditions.checkNotNull(allBulkPriorAllocationIds); + Set<AllocationID> reservedAllocationIds, Set<ExecutionVertexID> producersToIgnore) { + this.reservedAllocationIds = Preconditions.checkNotNull(reservedAllocationIds); this.producersToIgnore = Preconditions.checkNotNull(producersToIgnore); } @@ -86,8 +84,7 @@ class MergingSharedSlotProfileRetrieverFactory * <p>The preferred {@link AllocationID}s of the {@link SlotProfile} are all previous {@link * AllocationID}s of all executions sharing the slot. * - * <p>The {@link SlotProfile} also refers to all previous {@link AllocationID}s of all - * executions within the bulk. + * <p>The {@link SlotProfile} also refers to all reserved {@link AllocationID}s of the job. * * @param executionSlotSharingGroup executions sharing the slot. * @param physicalSlotResourceProfile {@link ResourceProfile} of the slot. @@ -110,7 +107,7 @@ class MergingSharedSlotProfileRetrieverFactory physicalSlotResourceProfile, preferredLocations, priorAllocations, - allBulkPriorAllocationIds); + reservedAllocationIds); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java index 229f2be..63940f5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java @@ -72,7 +72,9 @@ class SlotSharingExecutionSlotAllocatorFactory implements ExecutionSlotAllocator new DefaultSyncPreferredLocationsRetriever(context, context); SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory = new MergingSharedSlotProfileRetrieverFactory( - preferredLocationsRetriever, context::getPriorAllocationId); + preferredLocationsRetriever, + context::getPriorAllocationId, + context::getReservedAllocations); return new SlotSharingExecutionSlotAllocator( slotProvider, slotWillBeOccupiedIndefinitely, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java index 7102a28..b8e6934 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -51,7 +52,7 @@ import static org.junit.Assert.assertThat; /** * Tests for {@link org.apache.flink.runtime.scheduler.MergingSharedSlotProfileRetrieverFactory}. */ -public class MergingSharedSlotProfileRetrieverTest { +public class MergingSharedSlotProfileRetrieverTest extends TestLogger { private static final SyncPreferredLocationsRetriever EMPTY_PREFERRED_LOCATIONS_RETRIEVER = (executionVertexId, producersToIgnore) -> Collections.emptyList(); @@ -61,7 +62,8 @@ public class MergingSharedSlotProfileRetrieverTest { SharedSlotProfileRetriever sharedSlotProfileRetriever = new MergingSharedSlotProfileRetrieverFactory( EMPTY_PREFERRED_LOCATIONS_RETRIEVER, - executionVertexID -> new AllocationID()) + executionVertexID -> new AllocationID(), + () -> Collections.emptySet()) .createFromBulk(Collections.emptySet()); SlotProfile slotProfile = @@ -106,6 +108,7 @@ public class MergingSharedSlotProfileRetrieverTest { locations.put(executions.get(0), Arrays.asList(allLocations.get(0), allLocations.get(1))); locations.put(executions.get(1), Arrays.asList(allLocations.get(1), allLocations.get(2))); + List<AllocationID> prevAllocationIds = Collections.nCopies(3, new AllocationID()); SlotProfile slotProfile = getSlotProfile( (executionVertexId, producersToIgnore) -> { @@ -114,7 +117,8 @@ public class MergingSharedSlotProfileRetrieverTest { }, executions, ResourceProfile.ZERO, - Collections.nCopies(3, new AllocationID()), + prevAllocationIds, + prevAllocationIds, 2); assertThat( @@ -135,7 +139,8 @@ public class MergingSharedSlotProfileRetrieverTest { } @Test - public void testAllocationIdsOfSlotProfile() throws ExecutionException, InterruptedException { + public void testPreferredAllocationsOfSlotProfile() + throws ExecutionException, InterruptedException { AllocationID prevAllocationID1 = new AllocationID(); AllocationID prevAllocationID2 = new AllocationID(); List<AllocationID> prevAllocationIDs = @@ -146,9 +151,26 @@ public class MergingSharedSlotProfileRetrieverTest { assertThat( slotProfile.getPreferredAllocations(), containsInAnyOrder(prevAllocationID1, prevAllocationID2)); + } + + @Test + public void testReservedAllocationsOfSlotProfile() + throws ExecutionException, InterruptedException { + List<AllocationID> reservedAllocationIds = + Arrays.asList(new AllocationID(), new AllocationID(), new AllocationID()); + + SlotProfile slotProfile = + getSlotProfile( + EMPTY_PREFERRED_LOCATIONS_RETRIEVER, + Collections.emptyList(), + ResourceProfile.ZERO, + Collections.emptyList(), + reservedAllocationIds, + 0); + assertThat( slotProfile.getReservedAllocations(), - containsInAnyOrder(prevAllocationIDs.toArray())); + containsInAnyOrder(reservedAllocationIds.toArray())); } private static SlotProfile getSlotProfile( @@ -165,6 +187,7 @@ public class MergingSharedSlotProfileRetrieverTest { executions, resourceProfile, prevAllocationIDs, + prevAllocationIDs, executionSlotSharingGroupSize); } @@ -173,6 +196,7 @@ public class MergingSharedSlotProfileRetrieverTest { List<ExecutionVertexID> executions, ResourceProfile resourceProfile, List<AllocationID> prevAllocationIDs, + Collection<AllocationID> reservedAllocationIds, int executionSlotSharingGroupSize) throws ExecutionException, InterruptedException { SharedSlotProfileRetriever sharedSlotProfileRetriever = @@ -180,7 +204,8 @@ public class MergingSharedSlotProfileRetrieverTest { preferredLocationsRetriever, executionVertexID -> prevAllocationIDs.get( - executions.indexOf(executionVertexID))) + executions.indexOf(executionVertexID)), + () -> new HashSet<>(reservedAllocationIds)) .createFromBulk(new HashSet<>(executions)); ExecutionSlotSharingGroup executionSlotSharingGroup = new ExecutionSlotSharingGroup(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java index 9d71af5..c72d957 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java @@ -27,6 +27,8 @@ public class TestExecutionSlotAllocatorFactory implements ExecutionSlotAllocator private final TestExecutionSlotAllocator testExecutionSlotAllocator; + private ExecutionSlotAllocationContext latestExecutionSlotAllocationContext; + public TestExecutionSlotAllocatorFactory() { this.testExecutionSlotAllocator = new TestExecutionSlotAllocator(); } @@ -41,10 +43,15 @@ public class TestExecutionSlotAllocatorFactory implements ExecutionSlotAllocator @Override public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContext context) { + this.latestExecutionSlotAllocationContext = context; return testExecutionSlotAllocator; } public TestExecutionSlotAllocator getTestExecutionSlotAllocator() { return testExecutionSlotAllocator; } + + public ExecutionSlotAllocationContext getLatestExecutionSlotAllocationContext() { + return latestExecutionSlotAllocationContext; + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java new file mode 100644 index 0000000..2f16fe7 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.scheduler.DefaultScheduler; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.WaitingCancelableInvokable; +import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertNotNull; + +/** IT case to test local recovery using {@link DefaultScheduler}. */ +public class DefaultSchedulerLocalRecoveryITCase extends TestLogger { + + private static final long TIMEOUT = 10_000L; + + @Test + @Category(FailsWithAdaptiveScheduler.class) // FLINK-21450 + public void testLocalRecoveryFull() throws Exception { + testLocalRecoveryInternal("full"); + } + + @Test + @Category(FailsWithAdaptiveScheduler.class) // FLINK-21450 + public void testLocalRecoveryRegion() throws Exception { + testLocalRecoveryInternal("region"); + } + + private void testLocalRecoveryInternal(String failoverStrategyValue) throws Exception { + final Configuration configuration = new Configuration(); + configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true); + configuration.setString(EXECUTION_FAILOVER_STRATEGY.key(), failoverStrategyValue); + + final int parallelism = 10; + final ArchivedExecutionGraph graph = executeSchedulingTest(configuration, parallelism); + assertNonLocalRecoveredTasksEquals(graph, 1); + } + + private void assertNonLocalRecoveredTasksEquals(ArchivedExecutionGraph graph, int expected) { + int nonLocalRecoveredTasks = 0; + for (ArchivedExecutionVertex vertex : graph.getAllExecutionVertices()) { + int currentAttemptNumber = vertex.getCurrentExecutionAttempt().getAttemptNumber(); + if (currentAttemptNumber == 0) { + // the task had never restarted and do not need to recover + continue; + } + AllocationID priorAllocation = + vertex.getPriorExecutionAttempt(currentAttemptNumber - 1) + .getAssignedAllocationID(); + AllocationID currentAllocation = + vertex.getCurrentExecutionAttempt().getAssignedAllocationID(); + + assertNotNull(priorAllocation); + assertNotNull(currentAllocation); + if (!currentAllocation.equals(priorAllocation)) { + nonLocalRecoveredTasks++; + } + } + assertThat(nonLocalRecoveredTasks, is(expected)); + } + + private ArchivedExecutionGraph executeSchedulingTest( + Configuration configuration, int parallelism) throws Exception { + configuration.setString(RestOptions.BIND_PORT, "0"); + + final long slotIdleTimeout = TIMEOUT; + configuration.setLong(JobManagerOptions.SLOT_IDLE_TIMEOUT, slotIdleTimeout); + + configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("64mb")); + configuration.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, MemorySize.parse("16mb")); + configuration.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, MemorySize.parse("16mb")); + + final MiniClusterConfiguration miniClusterConfiguration = + new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumTaskManagers(parallelism) + .setNumSlotsPerTaskManager(1) + .build(); + + try (MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration)) { + miniCluster.start(); + + MiniClusterClient miniClusterClient = new MiniClusterClient(configuration, miniCluster); + + JobGraph jobGraph = createJobGraph(parallelism); + + // wait for the submission to succeed + JobID jobId = miniClusterClient.submitJob(jobGraph).get(TIMEOUT, TimeUnit.SECONDS); + + // wait until all tasks running before triggering task failures + waitUntilAllVerticesRunning(jobId, miniCluster); + + // kill one TM to trigger task failure and remove one existing slot + CompletableFuture<Void> terminationFuture = miniCluster.terminateTaskManager(0); + terminationFuture.get(); + + // restart a taskmanager as a replacement for the killed one + miniCluster.startTaskManager(); + + // wait until all tasks running again + waitUntilAllVerticesRunning(jobId, miniCluster); + + ArchivedExecutionGraph graph = + miniCluster.getArchivedExecutionGraph(jobGraph.getJobID()).get(); + + miniCluster.cancelJob(jobId).get(); + + return graph; + } + } + + private void waitUntilAllVerticesRunning(JobID jobId, MiniCluster miniCluster) + throws Exception { + CommonTestUtils.waitForAllTaskRunning( + () -> miniCluster.getExecutionGraph(jobId).get(TIMEOUT, TimeUnit.SECONDS), + Deadline.fromNow(Duration.ofMillis(TIMEOUT)), + false); + } + + private JobGraph createJobGraph(int parallelism) throws IOException { + final JobVertex source = new JobVertex("v1"); + source.setInvokableClass(WaitingCancelableInvokable.class); + source.setParallelism(parallelism); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10)); + + return JobGraphBuilder.newStreamingJobGraphBuilder() + .addJobVertices(Arrays.asList(source)) + .setExecutionConfig(executionConfig) + .build(); + } +}
