This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e38a6709b57000e38bf044d0e55da3dd3ec3bde8
Author: David Moravek <[email protected]>
AuthorDate: Fri Jan 21 10:09:53 2022 +0100

    [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler
---
 .../scheduler/adaptive/AdaptiveScheduler.java      |  16 +-
 .../scheduler/adaptive/JobGraphJobInformation.java |   5 +
 .../adaptive/allocator/DefaultSlotAssigner.java    |  77 ++++++++
 .../allocator/JobAllocationsInformation.java       | 112 ++++++++++++
 .../adaptive/allocator/JobInformation.java         |   2 +
 .../adaptive/allocator/SlotAllocator.java          |   4 +-
 .../{JobInformation.java => SlotAssigner.java}     |  36 ++--
 .../allocator/SlotSharingSlotAllocator.java        |  70 +++----
 .../allocator/StateLocalitySlotAssigner.java       | 201 +++++++++++++++++++++
 .../allocator/SlotSharingSlotAllocatorTest.java    | 161 +++++++++--------
 .../allocator/StateLocalitySlotAssignerTest.java   | 166 +++++++++++++++++
 .../adaptive/allocator/TestJobInformation.java     |  59 ++++++
 .../scheduler/adaptive/allocator/TestSlotInfo.java |  10 +-
 .../adaptive/allocator/TestVertexInformation.java} |  49 ++---
 .../adaptive/allocator/TestingSlotAllocator.java   |  31 +---
 15 files changed, 802 insertions(+), 197 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index a876bb65eb2..2446d41c5c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -96,6 +96,7 @@ import org.apache.flink.runtime.scheduler.SchedulerUtils;
 import 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
 import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
@@ -760,12 +761,15 @@ public class AdaptiveScheduler
                 .isPresent();
     }
 
-    private JobSchedulingPlan determineParallelism(SlotAllocator slotAllocator)
+    private JobSchedulingPlan determineParallelism(
+            SlotAllocator slotAllocator, @Nullable ExecutionGraph 
previousExecutionGraph)
             throws NoResourceAvailableException {
 
         return slotAllocator
                 .determineParallelismAndCalculateAssignment(
-                        jobInformation, 
declarativeSlotPool.getFreeSlotsInformation())
+                        jobInformation,
+                        declarativeSlotPool.getFreeSlotsInformation(),
+                        
JobAllocationsInformation.fromGraph(previousExecutionGraph))
                 .orElseThrow(
                         () ->
                                 new NoResourceAvailableException(
@@ -921,8 +925,7 @@ public class AdaptiveScheduler
     public void goToCreatingExecutionGraph(@Nullable ExecutionGraph 
previousExecutionGraph) {
         final 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
                 executionGraphWithAvailableResourcesFuture =
-                        createExecutionGraphWithAvailableResourcesAsync();
-
+                        
createExecutionGraphWithAvailableResourcesAsync(previousExecutionGraph);
         transitionToState(
                 new CreatingExecutionGraph.Factory(
                         this,
@@ -932,12 +935,13 @@ public class AdaptiveScheduler
     }
 
     private 
CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism>
-            createExecutionGraphWithAvailableResourcesAsync() {
+            createExecutionGraphWithAvailableResourcesAsync(
+                    @Nullable ExecutionGraph previousExecutionGraph) {
         final JobSchedulingPlan schedulingPlan;
         final VertexParallelismStore adjustedParallelismStore;
 
         try {
-            schedulingPlan = determineParallelism(slotAllocator);
+            schedulingPlan = determineParallelism(slotAllocator, 
previousExecutionGraph);
             JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
 
             for (JobVertex vertex : adjustedJobGraph.getVertices()) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
index 1b0bb60e7e6..2111b6e9c07 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java
@@ -106,6 +106,11 @@ public class JobGraphJobInformation implements 
JobInformation {
             return parallelismInfo.getParallelism();
         }
 
+        @Override
+        public int getMaxParallelism() {
+            return parallelismInfo.getMaxParallelism();
+        }
+
         @Override
         public SlotSharingGroup getSlotSharingGroup() {
             return jobVertex.getSlotSharingGroup();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
new file mode 100644
index 00000000000..0a8813b33a6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups 
equally. */
+public class DefaultSlotAssigner implements SlotAssigner {
+
+    @Override
+    public Collection<SlotAssignment> assignSlots(
+            JobInformation jobInformation,
+            Collection<? extends SlotInfo> freeSlots,
+            VertexParallelism vertexParallelism,
+            JobAllocationsInformation previousAllocations) {
+        List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+            
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
+        }
+
+        Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
+        Collection<SlotAssignment> assignments = new ArrayList<>();
+        for (ExecutionSlotSharingGroup group : allGroups) {
+            assignments.add(new SlotAssignment(iterator.next(), group));
+        }
+        return assignments;
+    }
+
+    static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(
+            VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
+        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
+        slotSharingGroup
+                .getJobVertexIds()
+                .forEach(
+                        jobVertexId -> {
+                            int parallelism = 
vertexParallelism.getParallelism(jobVertexId);
+                            for (int subtaskIdx = 0; subtaskIdx < parallelism; 
subtaskIdx++) {
+                                sharedSlotToVertexAssignment
+                                        .computeIfAbsent(subtaskIdx, ignored 
-> new HashSet<>())
+                                        .add(new 
ExecutionVertexID(jobVertexId, subtaskIdx));
+                            }
+                        });
+        return sharedSlotToVertexAssignment.values().stream()
+                .map(ExecutionSlotSharingGroup::new)
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java
new file mode 100644
index 00000000000..e17d242d342
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+
+/** Information about allocations of Job Vertices. */
+@Internal
+public class JobAllocationsInformation {
+
+    private final Map<JobVertexID, List<VertexAllocationInformation>> 
vertexAllocations;
+
+    JobAllocationsInformation(
+            Map<JobVertexID, List<VertexAllocationInformation>> 
vertexAllocations) {
+        this.vertexAllocations = vertexAllocations;
+    }
+
+    public static JobAllocationsInformation fromGraph(@Nullable ExecutionGraph 
graph) {
+        return graph == null ? empty() : new 
JobAllocationsInformation(calculateAllocations(graph));
+    }
+
+    public List<VertexAllocationInformation> getAllocations(JobVertexID 
jobVertexID) {
+        return vertexAllocations.getOrDefault(jobVertexID, emptyList());
+    }
+
+    private static Map<JobVertexID, List<VertexAllocationInformation>> 
calculateAllocations(
+            ExecutionGraph graph) {
+        final Map<JobVertexID, List<VertexAllocationInformation>> allocations 
= new HashMap<>();
+        for (ExecutionJobVertex vertex : graph.getVerticesTopologically()) {
+            JobVertexID jobVertexId = vertex.getJobVertexId();
+            for (ExecutionVertex executionVertex : vertex.getTaskVertices()) {
+                AllocationID allocationId =
+                        
executionVertex.getCurrentExecutionAttempt().getAssignedAllocationID();
+                KeyGroupRange kgr =
+                        
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
+                                vertex.getMaxParallelism(),
+                                vertex.getParallelism(),
+                                executionVertex.getParallelSubtaskIndex());
+                allocations
+                        .computeIfAbsent(jobVertexId, ignored -> new 
ArrayList<>())
+                        .add(new VertexAllocationInformation(allocationId, 
jobVertexId, kgr));
+            }
+        }
+        return allocations;
+    }
+
+    public static JobAllocationsInformation empty() {
+        return new JobAllocationsInformation(emptyMap());
+    }
+
+    public boolean isEmpty() {
+        return vertexAllocations.isEmpty();
+    }
+
+    /** Information about the allocations of a single Job Vertex. */
+    public static class VertexAllocationInformation {
+        private final AllocationID allocationID;
+        private final JobVertexID jobVertexID;
+        private final KeyGroupRange keyGroupRange;
+
+        public VertexAllocationInformation(
+                AllocationID allocationID, JobVertexID jobVertexID, 
KeyGroupRange keyGroupRange) {
+            this.allocationID = allocationID;
+            this.jobVertexID = jobVertexID;
+            this.keyGroupRange = keyGroupRange;
+        }
+
+        public AllocationID getAllocationID() {
+            return allocationID;
+        }
+
+        public JobVertexID getJobVertexID() {
+            return jobVertexID;
+        }
+
+        public KeyGroupRange getKeyGroupRange() {
+            return keyGroupRange;
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
index 99b8d48c051..b1ff9183b6a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
@@ -44,6 +44,8 @@ public interface JobInformation {
 
         int getParallelism();
 
+        int getMaxParallelism();
+
         SlotSharingGroup getSlotSharingGroup();
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
index fea0d659ad3..629a57f5122 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
@@ -60,7 +60,9 @@ public interface SlotAllocator {
      * assignment of slots to execution slot sharing groups.
      */
     Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment(
-            JobInformation jobInformation, Collection<? extends SlotInfo> 
slots);
+            JobInformation jobInformation,
+            Collection<? extends SlotInfo> slots,
+            JobAllocationsInformation jobAllocationsInformation);
 
     /**
      * Reserves slots according to the given assignment if possible. If the 
underlying set of
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
similarity index 52%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
index 99b8d48c051..cb264e28d88 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java
@@ -17,33 +17,19 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
 
 import java.util.Collection;
 
-/** Information about the job. */
-public interface JobInformation {
-    /**
-     * Returns all slot-sharing groups of the job.
-     *
-     * <p>Attention: The returned slot sharing groups should never be modified 
(they are indeed
-     * mutable)!
-     *
-     * @return all slot-sharing groups of the job
-     */
-    Collection<SlotSharingGroup> getSlotSharingGroups();
+/** Interface for assigning slots to slot sharing groups. */
+@Internal
+public interface SlotAssigner {
 
-    VertexInformation getVertexInformation(JobVertexID jobVertexId);
-
-    Iterable<VertexInformation> getVertices();
-
-    /** Information about a single vertex. */
-    interface VertexInformation {
-        JobVertexID getJobVertexID();
-
-        int getParallelism();
-
-        SlotSharingGroup getSlotSharingGroup();
-    }
+    Collection<SlotAssignment> assignSlots(
+            JobInformation jobInformation,
+            Collection<? extends SlotInfo> freeSlots,
+            VertexParallelism vertexParallelism,
+            JobAllocationsInformation previousAllocations);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 2b7a9c362ad..3d0f30b69e2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -37,12 +37,11 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 /** {@link SlotAllocator} implementation that supports slot sharing. */
@@ -125,30 +124,24 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
 
     @Override
     public Optional<JobSchedulingPlan> 
determineParallelismAndCalculateAssignment(
-            JobInformation jobInformation, Collection<? extends SlotInfo> 
slots) {
+            JobInformation jobInformation,
+            Collection<? extends SlotInfo> slots,
+            JobAllocationsInformation jobAllocationsInformation) {
         return determineParallelism(jobInformation, slots)
                 .map(
-                        parallelism ->
-                                new JobSchedulingPlan(
-                                        parallelism,
-                                        assignSlots(jobInformation, slots, 
parallelism)));
-    }
-
-    private Collection<SlotAssignment> assignSlots(
-            JobInformation jobInformation,
-            Collection<? extends SlotInfo> freeSlots,
-            VertexParallelism vertexParallelism) {
-        List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
-        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
-            
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
-        }
-
-        Iterator<? extends SlotInfo> iterator = freeSlots.iterator();
-        Collection<SlotAssignment> assignments = new ArrayList<>();
-        for (ExecutionSlotSharingGroup group : allGroups) {
-            assignments.add(new SlotAssignment(iterator.next(), group));
-        }
-        return assignments;
+                        parallelism -> {
+                            SlotAssigner slotAssigner =
+                                    jobAllocationsInformation.isEmpty()
+                                            ? new DefaultSlotAssigner()
+                                            : new StateLocalitySlotAssigner();
+                            return new JobSchedulingPlan(
+                                    parallelism,
+                                    slotAssigner.assignSlots(
+                                            jobInformation,
+                                            slots,
+                                            parallelism,
+                                            jobAllocationsInformation));
+                        });
     }
 
     /**
@@ -200,24 +193,6 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
         return vertexParallelism;
     }
 
-    private static List<ExecutionSlotSharingGroup> 
createExecutionSlotSharingGroups(
-            VertexParallelism vertexParallelism, SlotSharingGroup 
slotSharingGroup) {
-        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
-
-        for (JobVertexID jobVertexId : slotSharingGroup.getJobVertexIds()) {
-            int parallelism = vertexParallelism.getParallelism(jobVertexId);
-            for (int subtaskIdx = 0; subtaskIdx < parallelism; subtaskIdx++) {
-                sharedSlotToVertexAssignment
-                        .computeIfAbsent(subtaskIdx, ignored -> new 
HashSet<>())
-                        .add(new ExecutionVertexID(jobVertexId, subtaskIdx));
-            }
-        }
-
-        return sharedSlotToVertexAssignment.values().stream()
-                .map(ExecutionSlotSharingGroup::new)
-                .collect(Collectors.toList());
-    }
-
     @Override
     public Optional<ReservedSlots> tryReserveResources(JobSchedulingPlan 
jobSchedulingPlan) {
         final Collection<AllocationID> expectedSlots =
@@ -278,10 +253,21 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator {
     }
 
     static class ExecutionSlotSharingGroup {
+        private final String id;
         private final Set<ExecutionVertexID> containedExecutionVertices;
 
         public ExecutionSlotSharingGroup(Set<ExecutionVertexID> 
containedExecutionVertices) {
+            this(containedExecutionVertices, UUID.randomUUID().toString());
+        }
+
+        public ExecutionSlotSharingGroup(
+                Set<ExecutionVertexID> containedExecutionVertices, String id) {
             this.containedExecutionVertices = containedExecutionVertices;
+            this.id = id;
+        }
+
+        public String getId() {
+            return id;
         }
 
         public Collection<ExecutionVertexID> getContainedExecutionVertices() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
new file mode 100644
index 00000000000..4e2a807c062
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** A {@link SlotAssigner} that assigns slots based on the number of local key 
groups. */
+@Internal
+public class StateLocalitySlotAssigner implements SlotAssigner {
+
+    private static class AllocationScore implements 
Comparable<AllocationScore> {
+
+        private final String groupId;
+        private final AllocationID allocationId;
+
+        public AllocationScore(String groupId, AllocationID allocationId, long 
score) {
+            this.groupId = groupId;
+            this.allocationId = allocationId;
+            this.score = score;
+        }
+
+        private final long score;
+
+        public String getGroupId() {
+            return groupId;
+        }
+
+        public AllocationID getAllocationId() {
+            return allocationId;
+        }
+
+        public long getScore() {
+            return score;
+        }
+
+        @Override
+        public int compareTo(StateLocalitySlotAssigner.AllocationScore other) {
+            int result = Long.compare(score, other.score);
+            if (result != 0) {
+                return result;
+            }
+            result = other.allocationId.compareTo(allocationId);
+            if (result != 0) {
+                return result;
+            }
+            return other.groupId.compareTo(groupId);
+        }
+    }
+
+    @Override
+    public Collection<SlotAssignment> assignSlots(
+            JobInformation jobInformation,
+            Collection<? extends SlotInfo> freeSlots,
+            VertexParallelism vertexParallelism,
+            JobAllocationsInformation previousAllocations) {
+        checkState(
+                freeSlots.size() >= 
jobInformation.getSlotSharingGroups().size(),
+                "Not enough slots to allocate all the slot sharing groups 
(have: %s, need: %s)",
+                freeSlots.size(),
+                jobInformation.getSlotSharingGroups().size());
+
+        final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>();
+        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+            
allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, 
slotSharingGroup));
+        }
+        final Map<JobVertexID, Integer> parallelism = 
getParallelism(allGroups);
+        final PriorityQueue<AllocationScore> scores =
+                calculateScores(jobInformation, previousAllocations, 
allGroups, parallelism);
+
+        final Map<String, ExecutionSlotSharingGroup> groupsById =
+                
allGroups.stream().collect(toMap(ExecutionSlotSharingGroup::getId, identity()));
+        final Map<AllocationID, SlotInfo> slotsById =
+                freeSlots.stream().collect(toMap(SlotInfo::getAllocationId, 
identity()));
+        AllocationScore score;
+        final Collection<SlotAssignment> assignments = new ArrayList<>();
+        while ((score = scores.poll()) != null) {
+            if (slotsById.containsKey(score.getAllocationId())
+                    && groupsById.containsKey(score.getGroupId())) {
+                assignments.add(
+                        new SlotAssignment(
+                                slotsById.remove(score.getAllocationId()),
+                                groupsById.remove(score.getGroupId())));
+            }
+        }
+        // Distribute the remaining slots with no score
+        Iterator<? extends SlotInfo> remainingSlots = 
slotsById.values().iterator();
+        for (ExecutionSlotSharingGroup group : groupsById.values()) {
+            checkState(
+                    remainingSlots.hasNext(),
+                    "No slots available for group %s (%s more in total). This 
is likely a bug.",
+                    group,
+                    groupsById.size());
+            assignments.add(new SlotAssignment(remainingSlots.next(), group));
+            remainingSlots.remove();
+        }
+
+        return assignments;
+    }
+
+    @Nonnull
+    private PriorityQueue<AllocationScore> calculateScores(
+            JobInformation jobInformation,
+            JobAllocationsInformation previousAllocations,
+            List<ExecutionSlotSharingGroup> allGroups,
+            Map<JobVertexID, Integer> parallelism) {
+        // PQ orders the pairs (allocationID, groupID) by score, decreasing
+        // the score is computed as the potential amount of state that would 
reside locally
+        final PriorityQueue<AllocationScore> scores =
+                new PriorityQueue<>(Comparator.reverseOrder());
+        for (ExecutionSlotSharingGroup group : allGroups) {
+            scores.addAll(calculateScore(group, parallelism, jobInformation, 
previousAllocations));
+        }
+        return scores;
+    }
+
+    private static Map<JobVertexID, Integer> getParallelism(
+            List<ExecutionSlotSharingGroup> groups) {
+        final Map<JobVertexID, Integer> parallelism = new HashMap<>();
+        for (ExecutionSlotSharingGroup group : groups) {
+            for (ExecutionVertexID evi : 
group.getContainedExecutionVertices()) {
+                parallelism.merge(evi.getJobVertexId(), 1, Integer::sum);
+            }
+        }
+        return parallelism;
+    }
+
+    public Collection<AllocationScore> calculateScore(
+            ExecutionSlotSharingGroup group,
+            Map<JobVertexID, Integer> parallelism,
+            JobInformation jobInformation,
+            JobAllocationsInformation previousAllocations) {
+        final Map<AllocationID, Long> score = new HashMap<>();
+        for (ExecutionVertexID evi : group.getContainedExecutionVertices()) {
+            final KeyGroupRange kgr =
+                    
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
+                            jobInformation
+                                    .getVertexInformation(evi.getJobVertexId())
+                                    .getMaxParallelism(),
+                            parallelism.get(evi.getJobVertexId()),
+                            evi.getSubtaskIndex());
+            previousAllocations
+                    .getAllocations(evi.getJobVertexId())
+                    .forEach(
+                            allocation -> {
+                                long value =
+                                        allocation
+                                                .getKeyGroupRange()
+                                                .getIntersection(kgr)
+                                                .getNumberOfKeyGroups();
+                                if (value > 0) {
+                                    score.merge(allocation.getAllocationID(), 
value, Long::sum);
+                                }
+                            });
+        }
+
+        return score.entrySet().stream()
+                .map(e -> new AllocationScore(group.getId(), e.getKey(), 
e.getValue()))
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
index 7643acc7410..9a6b1cab5c2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.allocator;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -24,25 +25,31 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
 import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.topology.VertexID;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.util.TestLogger;
 
 import org.assertj.core.api.Assertions;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
+import java.util.Set;
+import java.util.stream.IntStream;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 
@@ -79,12 +86,11 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
         final ResourceCounter resourceCounter =
                 slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, 
vertex2, vertex3));
 
-        assertThat(resourceCounter.getResources(), 
contains(ResourceProfile.UNKNOWN));
-        assertThat(
-                resourceCounter.getResourceCount(ResourceProfile.UNKNOWN),
-                is(
+        
assertThat(resourceCounter.getResources()).contains(ResourceProfile.UNKNOWN);
+        assertThat(resourceCounter.getResourceCount(ResourceProfile.UNKNOWN))
+                .isEqualTo(
                         Math.max(vertex1.getParallelism(), 
vertex2.getParallelism())
-                                + vertex3.getParallelism()));
+                                + vertex3.getParallelism());
     }
 
     @Test
@@ -194,7 +200,8 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
 
         final JobSchedulingPlan jobSchedulingPlan =
                 slotAllocator
-                        
.determineParallelismAndCalculateAssignment(jobInformation, getSlots(50))
+                        .determineParallelismAndCalculateAssignment(
+                                jobInformation, getSlots(50), 
JobAllocationsInformation.empty())
                         .get();
 
         final ReservedSlots reservedSlots =
@@ -234,7 +241,8 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
 
         JobSchedulingPlan jobSchedulingPlan =
                 slotSharingSlotAllocator
-                        
.determineParallelismAndCalculateAssignment(jobInformation, getSlots(50))
+                        .determineParallelismAndCalculateAssignment(
+                                jobInformation, getSlots(50), 
JobAllocationsInformation.empty())
                         .get();
 
         final Optional<? extends ReservedSlots> reservedSlots =
@@ -243,6 +251,73 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
         assertFalse(reservedSlots.isPresent());
     }
 
+    /**
+     * Basic test to verify that allocation takes previous allocations into 
account to facilitate
+     * Local Recovery.
+     */
+    @Test
+    public void testStickyAllocation() {
+        Map<JobVertexID, List<VertexAllocationInformation>> locality = new 
HashMap<>();
+
+        // previous allocation allocation1: v1, v2
+        AllocationID allocation1 = new AllocationID();
+        locality.put(
+                vertex1.getJobVertexID(),
+                Collections.singletonList(
+                        new VertexAllocationInformation(
+                                allocation1, vertex1.getJobVertexID(), 
KeyGroupRange.of(1, 100))));
+        locality.put(
+                vertex2.getJobVertexID(),
+                Collections.singletonList(
+                        new VertexAllocationInformation(
+                                allocation1, vertex2.getJobVertexID(), 
KeyGroupRange.of(1, 100))));
+
+        // previous allocation allocation2: v3
+        AllocationID allocation2 = new AllocationID();
+        locality.put(
+                vertex3.getJobVertexID(),
+                Collections.singletonList(
+                        new VertexAllocationInformation(
+                                allocation2, vertex3.getJobVertexID(), 
KeyGroupRange.of(1, 100))));
+
+        List<SlotInfo> freeSlots = new ArrayList<>();
+        IntStream.range(0, 10).forEach(i -> freeSlots.add(new TestSlotInfo(new 
AllocationID())));
+        freeSlots.add(new TestSlotInfo(allocation1));
+        freeSlots.add(new TestSlotInfo(allocation2));
+
+        Map<JobVertexID, Long> stateSizes = new HashMap<>();
+        stateSizes.put(vertex1.getJobVertexID(), 10L);
+        stateSizes.put(vertex2.getJobVertexID(), 10L);
+        stateSizes.put(vertex3.getJobVertexID(), 10L);
+
+        JobSchedulingPlan schedulingPlan =
+                SlotSharingSlotAllocator.createSlotSharingSlotAllocator(
+                                (allocationId, resourceProfile) ->
+                                        TestingPhysicalSlot.builder().build(),
+                                (allocationID, cause, ts) -> {},
+                                id -> false)
+                        .determineParallelismAndCalculateAssignment(
+                                new TestJobInformation(Arrays.asList(vertex1, 
vertex2, vertex3)),
+                                freeSlots,
+                                new JobAllocationsInformation(locality))
+                        .get();
+
+        Map<AllocationID, Set<VertexID>> allocated = new HashMap<>();
+        for (JobSchedulingPlan.SlotAssignment assignment : 
schedulingPlan.getSlotAssignments()) {
+            ExecutionSlotSharingGroup target =
+                    assignment.getTargetAs(ExecutionSlotSharingGroup.class);
+            Set<VertexID> set =
+                    allocated.computeIfAbsent(
+                            assignment.getSlotInfo().getAllocationId(), ign -> 
new HashSet<>());
+            for (ExecutionVertexID id : 
target.getContainedExecutionVertices()) {
+                set.add(id.getJobVertexId());
+            }
+        }
+        
assertThat(allocated.get(allocation1)).contains(vertex1.getJobVertexID());
+        
assertThat(allocated.get(allocation1)).contains(vertex2.getJobVertexID());
+        
assertThat(allocated.get(allocation2)).contains(vertex3.getJobVertexID());
+    }
+
     private static Collection<SlotInfo> getSlots(int count) {
         final Collection<SlotInfo> slotInfo = new ArrayList<>();
         for (int i = 0; i < count; i++) {
@@ -250,68 +325,4 @@ public class SlotSharingSlotAllocatorTest extends 
TestLogger {
         }
         return slotInfo;
     }
-
-    private static class TestJobInformation implements JobInformation {
-
-        private final Map<JobVertexID, VertexInformation> 
vertexIdToInformation;
-        private final Collection<SlotSharingGroup> slotSharingGroups;
-
-        private TestJobInformation(Collection<VertexInformation> 
vertexIdToInformation) {
-            this.vertexIdToInformation =
-                    vertexIdToInformation.stream()
-                            .collect(
-                                    Collectors.toMap(
-                                            VertexInformation::getJobVertexID,
-                                            Function.identity()));
-            this.slotSharingGroups =
-                    vertexIdToInformation.stream()
-                            .map(VertexInformation::getSlotSharingGroup)
-                            .collect(Collectors.toSet());
-        }
-
-        @Override
-        public Collection<SlotSharingGroup> getSlotSharingGroups() {
-            return slotSharingGroups;
-        }
-
-        @Override
-        public VertexInformation getVertexInformation(JobVertexID jobVertexId) 
{
-            return vertexIdToInformation.get(jobVertexId);
-        }
-
-        @Override
-        public Iterable<VertexInformation> getVertices() {
-            return vertexIdToInformation.values();
-        }
-    }
-
-    private static class TestVertexInformation implements 
JobInformation.VertexInformation {
-
-        private final JobVertexID jobVertexId;
-        private final int parallelism;
-        private final SlotSharingGroup slotSharingGroup;
-
-        private TestVertexInformation(
-                JobVertexID jobVertexId, int parallelism, SlotSharingGroup 
slotSharingGroup) {
-            this.jobVertexId = jobVertexId;
-            this.parallelism = parallelism;
-            this.slotSharingGroup = slotSharingGroup;
-            slotSharingGroup.addVertexToGroup(jobVertexId);
-        }
-
-        @Override
-        public JobVertexID getJobVertexID() {
-            return jobVertexId;
-        }
-
-        @Override
-        public int getParallelism() {
-            return parallelism;
-        }
-
-        @Override
-        public SlotSharingGroup getSlotSharingGroup() {
-            return slotSharingGroup;
-        }
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
new file mode 100644
index 00000000000..aa9ea5a6bf9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation.VertexInformation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import org.hamcrest.MatcherAssert;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.hamcrest.Matchers.hasItems;
+import static org.hamcrest.Matchers.hasSize;
+
+/** {@link StateLocalitySlotAssigner} test. */
+class StateLocalitySlotAssignerTest {
+    @Test
+    public void testSlotsAreNotWasted() {
+        VertexInformation vertex = createVertex(2);
+        AllocationID alloc1 = new AllocationID();
+        AllocationID alloc2 = new AllocationID();
+
+        List<VertexAllocationInformation> allocations =
+                Arrays.asList(
+                        new VertexAllocationInformation(
+                                alloc1, vertex.getJobVertexID(), 
KeyGroupRange.of(0, 9)),
+                        new VertexAllocationInformation(
+                                alloc2, vertex.getJobVertexID(), 
KeyGroupRange.of(10, 19)));
+
+        assign(vertex, Arrays.asList(alloc1, alloc2), allocations);
+    }
+
+    @Test
+    public void testUpScaling() {
+        final int oldParallelism = 3;
+        final int newParallelism = 7;
+        final int numFreeSlots = 100;
+        final VertexInformation vertex = createVertex(newParallelism);
+        final List<AllocationID> allocationIDs = 
createAllocationIDS(numFreeSlots);
+
+        List<VertexAllocationInformation> prevAllocations = new ArrayList<>();
+        Iterator<AllocationID> iterator = allocationIDs.iterator();
+        for (int i = 0; i < oldParallelism; i++) {
+            prevAllocations.add(
+                    new VertexAllocationInformation(
+                            iterator.next(),
+                            vertex.getJobVertexID(),
+                            
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
+                                    vertex.getMaxParallelism(), 
oldParallelism, i)));
+        }
+
+        Collection<SlotAssignment> assignments = assign(vertex, allocationIDs, 
prevAllocations);
+
+        verifyAssignments(
+                assignments,
+                newParallelism,
+                prevAllocations.stream()
+                        .map(VertexAllocationInformation::getAllocationID)
+                        .toArray(AllocationID[]::new));
+    }
+
+    @Test
+    public void testDownScaling() {
+        final int oldParallelism = 5;
+        final int newParallelism = 1;
+        final int numFreeSlots = 100;
+        final VertexInformation vertex = createVertex(newParallelism);
+        final List<AllocationID> allocationIDs = 
createAllocationIDS(numFreeSlots);
+
+        // pretend that the 1st (0) subtask had half of key groups ...
+        final Iterator<AllocationID> iterator = allocationIDs.iterator();
+        final AllocationID biggestAllocation = iterator.next();
+        final List<VertexAllocationInformation> prevAllocations = new 
ArrayList<>();
+        final int halfOfKeyGroupRange = vertex.getMaxParallelism() / 2;
+        prevAllocations.add(
+                new VertexAllocationInformation(
+                        biggestAllocation,
+                        vertex.getJobVertexID(),
+                        KeyGroupRange.of(0, halfOfKeyGroupRange - 1)));
+
+        // and the remaining subtasks had only one key group each
+        for (int subtaskIdx = 1; subtaskIdx < oldParallelism; subtaskIdx++) {
+            int keyGroup = halfOfKeyGroupRange + subtaskIdx;
+            prevAllocations.add(
+                    new VertexAllocationInformation(
+                            iterator.next(),
+                            vertex.getJobVertexID(),
+                            KeyGroupRange.of(keyGroup, keyGroup)));
+        }
+
+        Collection<SlotAssignment> assignments = assign(vertex, allocationIDs, 
prevAllocations);
+
+        verifyAssignments(assignments, newParallelism, biggestAllocation);
+    }
+
+    private static void verifyAssignments(
+            Collection<SlotAssignment> assignments,
+            int expectedSize,
+            AllocationID... mustHaveAllocationID) {
+        MatcherAssert.assertThat(assignments, hasSize(expectedSize));
+        MatcherAssert.assertThat(
+                assignments.stream()
+                        .map(e -> e.getSlotInfo().getAllocationId())
+                        .collect(Collectors.toSet()),
+                hasItems(mustHaveAllocationID));
+    }
+
+    private static Collection<SlotAssignment> assign(
+            VertexInformation vertexInformation,
+            List<AllocationID> allocationIDs,
+            List<VertexAllocationInformation> allocations) {
+        return new StateLocalitySlotAssigner()
+                .assignSlots(
+                        new 
TestJobInformation(singletonList(vertexInformation)),
+                        
allocationIDs.stream().map(TestSlotInfo::new).collect(Collectors.toList()),
+                        new VertexParallelism(
+                                singletonMap(
+                                        vertexInformation.getJobVertexID(),
+                                        vertexInformation.getParallelism())),
+                        new JobAllocationsInformation(
+                                
singletonMap(vertexInformation.getJobVertexID(), allocations)));
+    }
+
+    private static VertexInformation createVertex(int parallelism) {
+        JobVertexID id = new JobVertexID();
+        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+        slotSharingGroup.addVertexToGroup(id);
+        return new TestVertexInformation(id, parallelism, slotSharingGroup);
+    }
+
+    private static List<AllocationID> createAllocationIDS(int numFreeSlots) {
+        return IntStream.range(0, numFreeSlots)
+                .mapToObj(i -> new AllocationID())
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java
new file mode 100644
index 00000000000..b9a3671d3c4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.allocator;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+class TestJobInformation implements JobInformation {
+
+    private final Map<JobVertexID, VertexInformation> vertexIdToInformation;
+    private final Collection<SlotSharingGroup> slotSharingGroups;
+
+    TestJobInformation(Collection<? extends VertexInformation> 
vertexIdToInformation) {
+        this.vertexIdToInformation =
+                vertexIdToInformation.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        VertexInformation::getJobVertexID, 
Function.identity()));
+        this.slotSharingGroups =
+                vertexIdToInformation.stream()
+                        .map(VertexInformation::getSlotSharingGroup)
+                        .collect(Collectors.toSet());
+    }
+
+    @Override
+    public Collection<SlotSharingGroup> getSlotSharingGroups() {
+        return slotSharingGroups;
+    }
+
+    @Override
+    public VertexInformation getVertexInformation(JobVertexID jobVertexId) {
+        return vertexIdToInformation.get(jobVertexId);
+    }
+
+    @Override
+    public Iterable<VertexInformation> getVertices() {
+        return vertexIdToInformation.values();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
index b023e5edf49..e467554ddfa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java
@@ -26,7 +26,15 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 /** Test {@link SlotInfo} implementation. */
 class TestSlotInfo implements SlotInfo {
 
-    private final AllocationID allocationId = new AllocationID();
+    private final AllocationID allocationId;
+
+    public TestSlotInfo() {
+        this(new AllocationID());
+    }
+
+    public TestSlotInfo(AllocationID allocationId) {
+        this.allocationId = allocationId;
+    }
 
     @Override
     public AllocationID getAllocationId() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
similarity index 53%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
copy to 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
index 99b8d48c051..4666ba20647 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java
@@ -20,30 +20,37 @@ package 
org.apache.flink.runtime.scheduler.adaptive.allocator;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 
-import java.util.Collection;
-
-/** Information about the job. */
-public interface JobInformation {
-    /**
-     * Returns all slot-sharing groups of the job.
-     *
-     * <p>Attention: The returned slot sharing groups should never be modified 
(they are indeed
-     * mutable)!
-     *
-     * @return all slot-sharing groups of the job
-     */
-    Collection<SlotSharingGroup> getSlotSharingGroups();
-
-    VertexInformation getVertexInformation(JobVertexID jobVertexId);
+class TestVertexInformation implements JobInformation.VertexInformation {
+
+    private final JobVertexID jobVertexId;
+    private final int parallelism;
+    private final SlotSharingGroup slotSharingGroup;
+
+    TestVertexInformation(
+            JobVertexID jobVertexId, int parallelism, SlotSharingGroup 
slotSharingGroup) {
+        this.jobVertexId = jobVertexId;
+        this.parallelism = parallelism;
+        this.slotSharingGroup = slotSharingGroup;
+        slotSharingGroup.addVertexToGroup(jobVertexId);
+    }
 
-    Iterable<VertexInformation> getVertices();
+    @Override
+    public JobVertexID getJobVertexID() {
+        return jobVertexId;
+    }
 
-    /** Information about a single vertex. */
-    interface VertexInformation {
-        JobVertexID getJobVertexID();
+    @Override
+    public int getParallelism() {
+        return parallelism;
+    }
 
-        int getParallelism();
+    @Override
+    public int getMaxParallelism() {
+        return 128;
+    }
 
-        SlotSharingGroup getSlotSharingGroup();
+    @Override
+    public SlotSharingGroup getSlotSharingGroup() {
+        return slotSharingGroup;
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
index 44b73122ae9..0fd638c43bf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.util.ResourceCounter;
 
 import java.util.Collection;
 import java.util.Optional;
-import java.util.function.BiFunction;
 import java.util.function.Function;
 
 /** Testing implementation of {@link SlotAllocator}. */
@@ -33,20 +32,13 @@ public class TestingSlotAllocator implements SlotAllocator {
     private final Function<Iterable<JobInformation.VertexInformation>, 
ResourceCounter>
             calculateRequiredSlotsFunction;
 
-    private final BiFunction<
-                    JobInformation, Collection<? extends SlotInfo>, 
Optional<VertexParallelism>>
-            determineParallelismFunction;
-
     private final Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction;
 
     private TestingSlotAllocator(
             Function<Iterable<JobInformation.VertexInformation>, 
ResourceCounter>
                     calculateRequiredSlotsFunction,
-            BiFunction<JobInformation, Collection<? extends SlotInfo>, 
Optional<VertexParallelism>>
-                    determineParallelismFunction,
             Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction) {
         this.calculateRequiredSlotsFunction = calculateRequiredSlotsFunction;
-        this.determineParallelismFunction = determineParallelismFunction;
         this.tryReserveResourcesFunction = tryReserveResourcesFunction;
     }
 
@@ -59,12 +51,14 @@ public class TestingSlotAllocator implements SlotAllocator {
     @Override
     public Optional<VertexParallelism> determineParallelism(
             JobInformation jobInformation, Collection<? extends SlotInfo> 
slots) {
-        return determineParallelismFunction.apply(jobInformation, slots);
+        return Optional.empty();
     }
 
     @Override
     public Optional<JobSchedulingPlan> 
determineParallelismAndCalculateAssignment(
-            JobInformation jobInformation, Collection<? extends SlotInfo> 
slots) {
+            JobInformation jobInformation,
+            Collection<? extends SlotInfo> slots,
+            JobAllocationsInformation jobAllocationsInformation) {
         return Optional.empty();
     }
 
@@ -81,9 +75,6 @@ public class TestingSlotAllocator implements SlotAllocator {
     public static final class Builder {
         private Function<Iterable<JobInformation.VertexInformation>, 
ResourceCounter>
                 calculateRequiredSlotsFunction = ignored -> 
ResourceCounter.empty();
-        private BiFunction<
-                        JobInformation, Collection<? extends SlotInfo>, 
Optional<VertexParallelism>>
-                determineParallelismFunction = (ignoredA, ignoredB) -> 
Optional.empty();
         private Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction =
                 ignored -> Optional.empty();
 
@@ -94,16 +85,6 @@ public class TestingSlotAllocator implements SlotAllocator {
             return this;
         }
 
-        public Builder setDetermineParallelismFunction(
-                BiFunction<
-                                JobInformation,
-                                Collection<? extends SlotInfo>,
-                                Optional<VertexParallelism>>
-                        determineParallelismFunction) {
-            this.determineParallelismFunction = determineParallelismFunction;
-            return this;
-        }
-
         public Builder setTryReserveResourcesFunction(
                 Function<VertexParallelism, Optional<ReservedSlots>> 
tryReserveResourcesFunction) {
             this.tryReserveResourcesFunction = tryReserveResourcesFunction;
@@ -112,9 +93,7 @@ public class TestingSlotAllocator implements SlotAllocator {
 
         public TestingSlotAllocator build() {
             return new TestingSlotAllocator(
-                    calculateRequiredSlotsFunction,
-                    determineParallelismFunction,
-                    tryReserveResourcesFunction);
+                    calculateRequiredSlotsFunction, 
tryReserveResourcesFunction);
         }
     }
 }

Reply via email to