This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 347becbe43209fb9c65bcc8ae3859a071469e587
Author: Zhu Zhu <[email protected]>
AuthorDate: Fri Oct 15 11:52:37 2021 +0800

    [FLINK-19142][runtime] Fix slot hijacking after task failover
    
    This closes #15229.
---
 .../flink/runtime/scheduler/DefaultScheduler.java  |  49 ++++++
 .../scheduler/ExecutionSlotAllocationContext.java  |  11 ++
 .../MergingSharedSlotProfileRetrieverFactory.java  |  31 ++--
 .../SlotSharingExecutionSlotAllocatorFactory.java  |   4 +-
 .../MergingSharedSlotProfileRetrieverTest.java     |  37 ++++-
 .../TestExecutionSlotAllocatorFactory.java         |   7 +
 .../DefaultSchedulerLocalRecoveryITCase.java       | 181 +++++++++++++++++++++
 7 files changed, 296 insertions(+), 24 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 9678786..fdfa238 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -63,6 +63,7 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -104,6 +105,13 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
 
     private final Time rpcTimeout;
 
+    private final Map<AllocationID, Long> reservedAllocationRefCounters;
+
+    // once an execution vertex is assigned an allocation/slot, it will 
reserve the allocation
+    // until it is assigned a new allocation, or it finishes and does not need 
the allocation
+    // anymore. The reserved allocation information is needed for local 
recovery.
+    private final Map<ExecutionVertexID, AllocationID> 
reservedAllocationByExecutionVertex;
+
     DefaultScheduler(
             final Logger log,
             final JobGraph jobGraph,
@@ -152,6 +160,9 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
         this.shuffleMaster = checkNotNull(shuffleMaster);
         this.rpcTimeout = checkNotNull(rpcTimeout);
 
+        this.reservedAllocationRefCounters = new HashMap<>();
+        this.reservedAllocationByExecutionVertex = new HashMap<>();
+
         final FailoverStrategy failoverStrategy =
                 failoverStrategyFactory.create(
                         getSchedulingTopology(), 
getResultPartitionAvailabilityChecker());
@@ -207,6 +218,16 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
             final ExecutionVertexID executionVertexId,
             final TaskExecutionStateTransition taskExecutionState) {
 
+        // once a task finishes, it will release the assigned allocation/slot 
and no longer
+        // needs it. Therefore, it should stop reserving the slot so that 
other tasks are
+        // possible to use the slot. Ideally, the `stopReserveAllocation` 
should happen
+        // along with the release slot process. However, that process is 
hidden in the depth
+        // of the ExecutionGraph, so we currently do it in DefaultScheduler 
after that process
+        // is done.
+        if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) 
{
+            stopReserveAllocation(executionVertexId);
+        }
+
         schedulingStrategy.onExecutionStateChange(
                 executionVertexId, taskExecutionState.getExecutionState());
         maybeHandleTaskFailure(taskExecutionState, executionVertexId);
@@ -520,10 +541,33 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
 
             final ExecutionVertex executionVertex = 
getExecutionVertex(executionVertexId);
             executionVertex.tryAssignResource(logicalSlot);
+
+            startReserveAllocation(executionVertexId, 
logicalSlot.getAllocationId());
+
             return logicalSlot;
         };
     }
 
+    private void startReserveAllocation(
+            ExecutionVertexID executionVertexId, AllocationID newAllocation) {
+
+        // stop the previous allocation reservation if there is one
+        stopReserveAllocation(executionVertexId);
+
+        reservedAllocationByExecutionVertex.put(executionVertexId, 
newAllocation);
+        reservedAllocationRefCounters.compute(
+                newAllocation, (ignored, oldCount) -> oldCount == null ? 1 : 
oldCount + 1);
+    }
+
+    private void stopReserveAllocation(ExecutionVertexID executionVertexId) {
+        final AllocationID priorAllocation =
+                reservedAllocationByExecutionVertex.remove(executionVertexId);
+        if (priorAllocation != null) {
+            reservedAllocationRefCounters.compute(
+                    priorAllocation, (ignored, oldCount) -> oldCount > 1 ? 
oldCount - 1 : null);
+        }
+    }
+
     private Function<LogicalSlot, CompletableFuture<Void>> 
registerProducedPartitions(
             final DeploymentHandle deploymentHandle) {
         final ExecutionVertexID executionVertexId = 
deploymentHandle.getExecutionVertexId();
@@ -672,6 +716,11 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
         public Optional<TaskManagerLocation> 
getStateLocation(ExecutionVertexID executionVertexId) {
             return stateLocationRetriever.getStateLocation(executionVertexId);
         }
+
+        @Override
+        public Set<AllocationID> getReservedAllocations() {
+            return reservedAllocationRefCounters.keySet();
+        }
     }
 
     private void enrichResourceProfile() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
index b089a35..0320fac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
@@ -67,4 +68,14 @@ interface ExecutionSlotAllocationContext extends 
InputsLocationsRetriever, State
      * @return all co-location groups in the job
      */
     Set<CoLocationGroup> getCoLocationGroups();
+
+    /**
+     * Returns all reserved allocations. These allocations/slots were used to 
run certain vertices
+     * and reserving them can prevent other vertices to take these slots and 
thus help vertices to
+     * be deployed into their previous slots again after failover. It is 
needed if {@link
+     * CheckpointingOptions#LOCAL_RECOVERY} is enabled.
+     *
+     * @return all reserved allocations
+     */
+    Set<AllocationID> getReservedAllocations();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
index 42ed8e0..1fbde93 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
@@ -28,10 +28,9 @@ import org.apache.flink.util.Preconditions;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
-import java.util.stream.Collectors;
+import java.util.function.Supplier;
 
 /** Factory for {@link MergingSharedSlotProfileRetriever}. */
 class MergingSharedSlotProfileRetrieverFactory
@@ -40,21 +39,21 @@ class MergingSharedSlotProfileRetrieverFactory
 
     private final Function<ExecutionVertexID, AllocationID> 
priorAllocationIdRetriever;
 
+    private final Supplier<Set<AllocationID>> reservedAllocationIdsRetriever;
+
     MergingSharedSlotProfileRetrieverFactory(
             SyncPreferredLocationsRetriever preferredLocationsRetriever,
-            Function<ExecutionVertexID, AllocationID> 
priorAllocationIdRetriever) {
+            Function<ExecutionVertexID, AllocationID> 
priorAllocationIdRetriever,
+            Supplier<Set<AllocationID>> reservedAllocationIdsRetriever) {
         this.preferredLocationsRetriever = 
Preconditions.checkNotNull(preferredLocationsRetriever);
         this.priorAllocationIdRetriever = 
Preconditions.checkNotNull(priorAllocationIdRetriever);
+        this.reservedAllocationIdsRetriever =
+                Preconditions.checkNotNull(reservedAllocationIdsRetriever);
     }
 
     @Override
     public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID> 
bulk) {
-        Set<AllocationID> allPriorAllocationIds =
-                bulk.stream()
-                        .map(priorAllocationIdRetriever)
-                        .filter(Objects::nonNull)
-                        .collect(Collectors.toSet());
-        return new MergingSharedSlotProfileRetriever(allPriorAllocationIds, 
bulk);
+        return new 
MergingSharedSlotProfileRetriever(reservedAllocationIdsRetriever.get(), bulk);
     }
 
     /**
@@ -62,16 +61,15 @@ class MergingSharedSlotProfileRetrieverFactory
      * schedule.
      */
     private class MergingSharedSlotProfileRetriever implements 
SharedSlotProfileRetriever {
-        /** All previous {@link AllocationID}s of the bulk to schedule. */
-        private final Set<AllocationID> allBulkPriorAllocationIds;
+        /** All reserved {@link AllocationID}s of the job. */
+        private final Set<AllocationID> reservedAllocationIds;
 
         /** All {@link ExecutionVertexID}s of the bulk. */
         private final Set<ExecutionVertexID> producersToIgnore;
 
         private MergingSharedSlotProfileRetriever(
-                Set<AllocationID> allBulkPriorAllocationIds,
-                Set<ExecutionVertexID> producersToIgnore) {
-            this.allBulkPriorAllocationIds = 
Preconditions.checkNotNull(allBulkPriorAllocationIds);
+                Set<AllocationID> reservedAllocationIds, 
Set<ExecutionVertexID> producersToIgnore) {
+            this.reservedAllocationIds = 
Preconditions.checkNotNull(reservedAllocationIds);
             this.producersToIgnore = 
Preconditions.checkNotNull(producersToIgnore);
         }
 
@@ -86,8 +84,7 @@ class MergingSharedSlotProfileRetrieverFactory
          * <p>The preferred {@link AllocationID}s of the {@link SlotProfile} 
are all previous {@link
          * AllocationID}s of all executions sharing the slot.
          *
-         * <p>The {@link SlotProfile} also refers to all previous {@link 
AllocationID}s of all
-         * executions within the bulk.
+         * <p>The {@link SlotProfile} also refers to all reserved {@link 
AllocationID}s of the job.
          *
          * @param executionSlotSharingGroup executions sharing the slot.
          * @param physicalSlotResourceProfile {@link ResourceProfile} of the 
slot.
@@ -110,7 +107,7 @@ class MergingSharedSlotProfileRetrieverFactory
                     physicalSlotResourceProfile,
                     preferredLocations,
                     priorAllocations,
-                    allBulkPriorAllocationIds);
+                    reservedAllocationIds);
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
index 229f2be..63940f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java
@@ -72,7 +72,9 @@ class SlotSharingExecutionSlotAllocatorFactory implements 
ExecutionSlotAllocator
                 new DefaultSyncPreferredLocationsRetriever(context, context);
         SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory =
                 new MergingSharedSlotProfileRetrieverFactory(
-                        preferredLocationsRetriever, 
context::getPriorAllocationId);
+                        preferredLocationsRetriever,
+                        context::getPriorAllocationId,
+                        context::getReservedAllocations);
         return new SlotSharingExecutionSlotAllocator(
                 slotProvider,
                 slotWillBeOccupiedIndefinitely,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java
index 7102a28..b8e6934 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -51,7 +52,7 @@ import static org.junit.Assert.assertThat;
 /**
  * Tests for {@link 
org.apache.flink.runtime.scheduler.MergingSharedSlotProfileRetrieverFactory}.
  */
-public class MergingSharedSlotProfileRetrieverTest {
+public class MergingSharedSlotProfileRetrieverTest extends TestLogger {
 
     private static final SyncPreferredLocationsRetriever 
EMPTY_PREFERRED_LOCATIONS_RETRIEVER =
             (executionVertexId, producersToIgnore) -> Collections.emptyList();
@@ -61,7 +62,8 @@ public class MergingSharedSlotProfileRetrieverTest {
         SharedSlotProfileRetriever sharedSlotProfileRetriever =
                 new MergingSharedSlotProfileRetrieverFactory(
                                 EMPTY_PREFERRED_LOCATIONS_RETRIEVER,
-                                executionVertexID -> new AllocationID())
+                                executionVertexID -> new AllocationID(),
+                                () -> Collections.emptySet())
                         .createFromBulk(Collections.emptySet());
 
         SlotProfile slotProfile =
@@ -106,6 +108,7 @@ public class MergingSharedSlotProfileRetrieverTest {
         locations.put(executions.get(0), Arrays.asList(allLocations.get(0), 
allLocations.get(1)));
         locations.put(executions.get(1), Arrays.asList(allLocations.get(1), 
allLocations.get(2)));
 
+        List<AllocationID> prevAllocationIds = Collections.nCopies(3, new 
AllocationID());
         SlotProfile slotProfile =
                 getSlotProfile(
                         (executionVertexId, producersToIgnore) -> {
@@ -114,7 +117,8 @@ public class MergingSharedSlotProfileRetrieverTest {
                         },
                         executions,
                         ResourceProfile.ZERO,
-                        Collections.nCopies(3, new AllocationID()),
+                        prevAllocationIds,
+                        prevAllocationIds,
                         2);
 
         assertThat(
@@ -135,7 +139,8 @@ public class MergingSharedSlotProfileRetrieverTest {
     }
 
     @Test
-    public void testAllocationIdsOfSlotProfile() throws ExecutionException, 
InterruptedException {
+    public void testPreferredAllocationsOfSlotProfile()
+            throws ExecutionException, InterruptedException {
         AllocationID prevAllocationID1 = new AllocationID();
         AllocationID prevAllocationID2 = new AllocationID();
         List<AllocationID> prevAllocationIDs =
@@ -146,9 +151,26 @@ public class MergingSharedSlotProfileRetrieverTest {
         assertThat(
                 slotProfile.getPreferredAllocations(),
                 containsInAnyOrder(prevAllocationID1, prevAllocationID2));
+    }
+
+    @Test
+    public void testReservedAllocationsOfSlotProfile()
+            throws ExecutionException, InterruptedException {
+        List<AllocationID> reservedAllocationIds =
+                Arrays.asList(new AllocationID(), new AllocationID(), new 
AllocationID());
+
+        SlotProfile slotProfile =
+                getSlotProfile(
+                        EMPTY_PREFERRED_LOCATIONS_RETRIEVER,
+                        Collections.emptyList(),
+                        ResourceProfile.ZERO,
+                        Collections.emptyList(),
+                        reservedAllocationIds,
+                        0);
+
         assertThat(
                 slotProfile.getReservedAllocations(),
-                containsInAnyOrder(prevAllocationIDs.toArray()));
+                containsInAnyOrder(reservedAllocationIds.toArray()));
     }
 
     private static SlotProfile getSlotProfile(
@@ -165,6 +187,7 @@ public class MergingSharedSlotProfileRetrieverTest {
                 executions,
                 resourceProfile,
                 prevAllocationIDs,
+                prevAllocationIDs,
                 executionSlotSharingGroupSize);
     }
 
@@ -173,6 +196,7 @@ public class MergingSharedSlotProfileRetrieverTest {
             List<ExecutionVertexID> executions,
             ResourceProfile resourceProfile,
             List<AllocationID> prevAllocationIDs,
+            Collection<AllocationID> reservedAllocationIds,
             int executionSlotSharingGroupSize)
             throws ExecutionException, InterruptedException {
         SharedSlotProfileRetriever sharedSlotProfileRetriever =
@@ -180,7 +204,8 @@ public class MergingSharedSlotProfileRetrieverTest {
                                 preferredLocationsRetriever,
                                 executionVertexID ->
                                         prevAllocationIDs.get(
-                                                
executions.indexOf(executionVertexID)))
+                                                
executions.indexOf(executionVertexID)),
+                                () -> new HashSet<>(reservedAllocationIds))
                         .createFromBulk(new HashSet<>(executions));
 
         ExecutionSlotSharingGroup executionSlotSharingGroup = new 
ExecutionSlotSharingGroup();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
index 9d71af5..c72d957 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestExecutionSlotAllocatorFactory.java
@@ -27,6 +27,8 @@ public class TestExecutionSlotAllocatorFactory implements 
ExecutionSlotAllocator
 
     private final TestExecutionSlotAllocator testExecutionSlotAllocator;
 
+    private ExecutionSlotAllocationContext 
latestExecutionSlotAllocationContext;
+
     public TestExecutionSlotAllocatorFactory() {
         this.testExecutionSlotAllocator = new TestExecutionSlotAllocator();
     }
@@ -41,10 +43,15 @@ public class TestExecutionSlotAllocatorFactory implements 
ExecutionSlotAllocator
 
     @Override
     public ExecutionSlotAllocator createInstance(final 
ExecutionSlotAllocationContext context) {
+        this.latestExecutionSlotAllocationContext = context;
         return testExecutionSlotAllocator;
     }
 
     public TestExecutionSlotAllocator getTestExecutionSlotAllocator() {
         return testExecutionSlotAllocator;
     }
+
+    public ExecutionSlotAllocationContext 
getLatestExecutionSlotAllocationContext() {
+        return latestExecutionSlotAllocationContext;
+    }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java
new file mode 100644
index 0000000..2f16fe7
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/DefaultSchedulerLocalRecoveryITCase.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.scheduler.DefaultScheduler;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.WaitingCancelableInvokable;
+import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNotNull;
+
+/** IT case to test local recovery using {@link DefaultScheduler}. */
+public class DefaultSchedulerLocalRecoveryITCase extends TestLogger {
+
+    private static final long TIMEOUT = 10_000L;
+
+    @Test
+    @Category(FailsWithAdaptiveScheduler.class) // FLINK-21450
+    public void testLocalRecoveryFull() throws Exception {
+        testLocalRecoveryInternal("full");
+    }
+
+    @Test
+    @Category(FailsWithAdaptiveScheduler.class) // FLINK-21450
+    public void testLocalRecoveryRegion() throws Exception {
+        testLocalRecoveryInternal("region");
+    }
+
+    private void testLocalRecoveryInternal(String failoverStrategyValue) 
throws Exception {
+        final Configuration configuration = new Configuration();
+        configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
+        configuration.setString(EXECUTION_FAILOVER_STRATEGY.key(), 
failoverStrategyValue);
+
+        final int parallelism = 10;
+        final ArchivedExecutionGraph graph = 
executeSchedulingTest(configuration, parallelism);
+        assertNonLocalRecoveredTasksEquals(graph, 1);
+    }
+
+    private void assertNonLocalRecoveredTasksEquals(ArchivedExecutionGraph 
graph, int expected) {
+        int nonLocalRecoveredTasks = 0;
+        for (ArchivedExecutionVertex vertex : graph.getAllExecutionVertices()) 
{
+            int currentAttemptNumber = 
vertex.getCurrentExecutionAttempt().getAttemptNumber();
+            if (currentAttemptNumber == 0) {
+                // the task had never restarted and do not need to recover
+                continue;
+            }
+            AllocationID priorAllocation =
+                    vertex.getPriorExecutionAttempt(currentAttemptNumber - 1)
+                            .getAssignedAllocationID();
+            AllocationID currentAllocation =
+                    
vertex.getCurrentExecutionAttempt().getAssignedAllocationID();
+
+            assertNotNull(priorAllocation);
+            assertNotNull(currentAllocation);
+            if (!currentAllocation.equals(priorAllocation)) {
+                nonLocalRecoveredTasks++;
+            }
+        }
+        assertThat(nonLocalRecoveredTasks, is(expected));
+    }
+
+    private ArchivedExecutionGraph executeSchedulingTest(
+            Configuration configuration, int parallelism) throws Exception {
+        configuration.setString(RestOptions.BIND_PORT, "0");
+
+        final long slotIdleTimeout = TIMEOUT;
+        configuration.setLong(JobManagerOptions.SLOT_IDLE_TIMEOUT, 
slotIdleTimeout);
+
+        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, 
MemorySize.parse("64mb"));
+        configuration.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, 
MemorySize.parse("16mb"));
+        configuration.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, 
MemorySize.parse("16mb"));
+
+        final MiniClusterConfiguration miniClusterConfiguration =
+                new MiniClusterConfiguration.Builder()
+                        .setConfiguration(configuration)
+                        .setNumTaskManagers(parallelism)
+                        .setNumSlotsPerTaskManager(1)
+                        .build();
+
+        try (MiniCluster miniCluster = new 
MiniCluster(miniClusterConfiguration)) {
+            miniCluster.start();
+
+            MiniClusterClient miniClusterClient = new 
MiniClusterClient(configuration, miniCluster);
+
+            JobGraph jobGraph = createJobGraph(parallelism);
+
+            // wait for the submission to succeed
+            JobID jobId = miniClusterClient.submitJob(jobGraph).get(TIMEOUT, 
TimeUnit.SECONDS);
+
+            // wait until all tasks running before triggering task failures
+            waitUntilAllVerticesRunning(jobId, miniCluster);
+
+            // kill one TM to trigger task failure and remove one existing slot
+            CompletableFuture<Void> terminationFuture = 
miniCluster.terminateTaskManager(0);
+            terminationFuture.get();
+
+            // restart a taskmanager as a replacement for the killed one
+            miniCluster.startTaskManager();
+
+            // wait until all tasks running again
+            waitUntilAllVerticesRunning(jobId, miniCluster);
+
+            ArchivedExecutionGraph graph =
+                    
miniCluster.getArchivedExecutionGraph(jobGraph.getJobID()).get();
+
+            miniCluster.cancelJob(jobId).get();
+
+            return graph;
+        }
+    }
+
+    private void waitUntilAllVerticesRunning(JobID jobId, MiniCluster 
miniCluster)
+            throws Exception {
+        CommonTestUtils.waitForAllTaskRunning(
+                () -> miniCluster.getExecutionGraph(jobId).get(TIMEOUT, 
TimeUnit.SECONDS),
+                Deadline.fromNow(Duration.ofMillis(TIMEOUT)),
+                false);
+    }
+
+    private JobGraph createJobGraph(int parallelism) throws IOException {
+        final JobVertex source = new JobVertex("v1");
+        source.setInvokableClass(WaitingCancelableInvokable.class);
+        source.setParallelism(parallelism);
+
+        ExecutionConfig executionConfig = new ExecutionConfig();
+        
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10));
+
+        return JobGraphBuilder.newStreamingJobGraphBuilder()
+                .addJobVertices(Arrays.asList(source))
+                .setExecutionConfig(executionConfig)
+                .build();
+    }
+}

Reply via email to