This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 76049d09e40a82f1222ed8a2e8279eda183f43f5
Author: Roc Marshal <[email protected]>
AuthorDate: Fri Jul 26 10:48:22 2024 +0800

    [FLINK-33386][runtime] Support tasks balancing at slot level for Default 
Scheduler
---
 .../scheduler/AbstractSlotSharingStrategy.java     | 113 ++++++++
 .../scheduler/ExecutionSlotSharingGroup.java       |  22 +-
 .../LocalInputPreferredSlotSharingStrategy.java    |  90 +-----
 .../TaskBalancedPreferredSlotSharingStrategy.java  | 310 +++++++++++++++++++++
 .../scheduler/AbstractSlotSharingStrategyTest.java | 140 ++++++++++
 ...LocalInputPreferredSlotSharingStrategyTest.java | 260 ++++++++---------
 .../MergingSharedSlotProfileRetrieverTest.java     |   7 +-
 .../runtime/scheduler/SharedSlotTestingUtils.java  |   3 +-
 .../SlotSharingExecutionSlotAllocatorTest.java     |   6 +-
 ...skBalancedPreferredSlotSharingStrategyTest.java | 178 ++++++++++++
 10 files changed, 894 insertions(+), 235 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java
new file mode 100644
index 00000000000..d13c62909b3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Base abstract implementation for {@link SlotSharingStrategy}. */
+abstract class AbstractSlotSharingStrategy
+        implements SlotSharingStrategy, SchedulingTopologyListener {
+
+    protected final Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
executionSlotSharingGroupMap;
+
+    protected final Set<SlotSharingGroup> logicalSlotSharingGroups;
+
+    protected final Set<CoLocationGroup> coLocationGroups;
+
+    AbstractSlotSharingStrategy(
+            final SchedulingTopology topology,
+            final Set<SlotSharingGroup> slotSharingGroups,
+            final Set<CoLocationGroup> coLocationGroups) {
+        this.logicalSlotSharingGroups = checkNotNull(slotSharingGroups);
+        this.coLocationGroups = checkNotNull(coLocationGroups);
+
+        this.executionSlotSharingGroupMap = 
computeExecutionSlotSharingGroups(topology);
+        topology.registerSchedulingTopologyListener(this);
+    }
+
+    @Override
+    public ExecutionSlotSharingGroup getExecutionSlotSharingGroup(
+            final ExecutionVertexID executionVertexId) {
+        return executionSlotSharingGroupMap.get(executionVertexId);
+    }
+
+    @Override
+    public Set<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups() {
+        return new HashSet<>(executionSlotSharingGroupMap.values());
+    }
+
+    @Override
+    public void notifySchedulingTopologyUpdated(
+            SchedulingTopology schedulingTopology, List<ExecutionVertexID> 
newExecutionVertices) {
+        final Map<ExecutionVertexID, ExecutionSlotSharingGroup> newMap =
+                computeExecutionSlotSharingGroups(schedulingTopology);
+
+        for (ExecutionVertexID vertexId : newMap.keySet()) {
+            final ExecutionSlotSharingGroup newEssg = newMap.get(vertexId);
+            final ExecutionSlotSharingGroup oldEssg = 
executionSlotSharingGroupMap.get(vertexId);
+            if (oldEssg == null) {
+                executionSlotSharingGroupMap.put(vertexId, newEssg);
+            } else {
+                // ensures that existing slot sharing groups are not changed
+                checkState(
+                        
oldEssg.getExecutionVertexIds().equals(newEssg.getExecutionVertexIds()),
+                        "Existing ExecutionSlotSharingGroups are changed after 
topology update");
+            }
+        }
+    }
+
+    /**
+     * The vertices are topologically sorted since {@link 
DefaultExecutionTopology#getVertices} are
+     * topologically sorted.
+     */
+    @Nonnull
+    static LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
getExecutionVertices(
+            SchedulingTopology topology) {
+        final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
vertices =
+                new LinkedHashMap<>();
+        for (SchedulingExecutionVertex executionVertex : 
topology.getVertices()) {
+            final List<SchedulingExecutionVertex> executionVertexGroup =
+                    vertices.computeIfAbsent(
+                            executionVertex.getId().getJobVertexId(), k -> new 
ArrayList<>());
+            executionVertexGroup.add(executionVertex);
+        }
+        return vertices;
+    }
+
+    protected abstract Map<ExecutionVertexID, ExecutionSlotSharingGroup>
+            computeExecutionSlotSharingGroups(SchedulingTopology 
schedulingTopology);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
index cf513eb477f..317f0ef99ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java
@@ -18,10 +18,14 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -31,9 +35,10 @@ class ExecutionSlotSharingGroup {
 
     private final Set<ExecutionVertexID> executionVertexIds;
 
-    private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+    @Nonnull private final SlotSharingGroup slotSharingGroup;
 
-    ExecutionSlotSharingGroup() {
+    ExecutionSlotSharingGroup(@Nonnull SlotSharingGroup slotSharingGroup) {
+        this.slotSharingGroup = Preconditions.checkNotNull(slotSharingGroup);
         this.executionVertexIds = new HashSet<>();
     }
 
@@ -41,12 +46,15 @@ class ExecutionSlotSharingGroup {
         executionVertexIds.add(executionVertexId);
     }
 
-    void setResourceProfile(ResourceProfile resourceProfile) {
-        this.resourceProfile = Preconditions.checkNotNull(resourceProfile);
+    @VisibleForTesting
+    @Nonnull
+    SlotSharingGroup getSlotSharingGroup() {
+        return slotSharingGroup;
     }
 
+    @Nonnull
     ResourceProfile getResourceProfile() {
-        return resourceProfile;
+        return slotSharingGroup.getResourceProfile();
     }
 
     Set<ExecutionVertexID> getExecutionVertexIds() {
@@ -58,8 +66,8 @@ class ExecutionSlotSharingGroup {
         return "ExecutionSlotSharingGroup{"
                 + "executionVertexIds="
                 + executionVertexIds
-                + ", resourceProfile="
-                + resourceProfile
+                + ", slotSharingGroup="
+                + slotSharingGroup
                 + '}';
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
index 99b4f1ca85a..f4c0e854512 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
@@ -31,7 +30,6 @@ import 
org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -42,69 +40,19 @@ import java.util.Objects;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This strategy tries to reduce remote data exchanges. Execution vertices, 
which are connected and
  * belong to the same SlotSharingGroup, tend to be put in the same 
ExecutionSlotSharingGroup.
  * Co-location constraints will be respected.
  */
-class LocalInputPreferredSlotSharingStrategy
-        implements SlotSharingStrategy, SchedulingTopologyListener {
-
-    private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
executionSlotSharingGroupMap;
-
-    private final Set<SlotSharingGroup> logicalSlotSharingGroups;
-
-    private final Set<CoLocationGroup> coLocationGroups;
+class LocalInputPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
 
     LocalInputPreferredSlotSharingStrategy(
             final SchedulingTopology topology,
             final Set<SlotSharingGroup> logicalSlotSharingGroups,
             final Set<CoLocationGroup> coLocationGroups) {
-
-        this.logicalSlotSharingGroups = checkNotNull(logicalSlotSharingGroups);
-        this.coLocationGroups = checkNotNull(coLocationGroups);
-
-        this.executionSlotSharingGroupMap =
-                new ExecutionSlotSharingGroupBuilder(
-                                topology, logicalSlotSharingGroups, 
coLocationGroups)
-                        .build();
-        topology.registerSchedulingTopologyListener(this);
-    }
-
-    @Override
-    public ExecutionSlotSharingGroup getExecutionSlotSharingGroup(
-            final ExecutionVertexID executionVertexId) {
-        return executionSlotSharingGroupMap.get(executionVertexId);
-    }
-
-    @Override
-    public Set<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups() {
-        return new HashSet<>(executionSlotSharingGroupMap.values());
-    }
-
-    @Override
-    public void notifySchedulingTopologyUpdated(
-            SchedulingTopology schedulingTopology, List<ExecutionVertexID> 
newExecutionVertices) {
-
-        final Map<ExecutionVertexID, ExecutionSlotSharingGroup> newMap =
-                new 
LocalInputPreferredSlotSharingStrategy.ExecutionSlotSharingGroupBuilder(
-                                schedulingTopology, logicalSlotSharingGroups, 
coLocationGroups)
-                        .build();
-
-        for (ExecutionVertexID vertexId : newMap.keySet()) {
-            final ExecutionSlotSharingGroup newEssg = newMap.get(vertexId);
-            final ExecutionSlotSharingGroup oldEssg = 
executionSlotSharingGroupMap.get(vertexId);
-            if (oldEssg == null) {
-                executionSlotSharingGroupMap.put(vertexId, newEssg);
-            } else {
-                // ensures that existing slot sharing groups are not changed
-                checkState(
-                        
oldEssg.getExecutionVertexIds().equals(newEssg.getExecutionVertexIds()),
-                        "Existing ExecutionSlotSharingGroups are changed after 
topology update");
-            }
-        }
+        super(topology, logicalSlotSharingGroups, coLocationGroups);
     }
 
     static class Factory implements SlotSharingStrategy.Factory {
@@ -119,7 +67,15 @@ class LocalInputPreferredSlotSharingStrategy
         }
     }
 
-    private static class ExecutionSlotSharingGroupBuilder {
+    @Override
+    protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
computeExecutionSlotSharingGroups(
+            SchedulingTopology schedulingTopology) {
+        return new LocalInputPreferredExecutionSlotSharingGroupBuilder(
+                        schedulingTopology, logicalSlotSharingGroups, 
coLocationGroups)
+                .build();
+    }
+
+    private static class LocalInputPreferredExecutionSlotSharingGroupBuilder {
         private final SchedulingTopology topology;
 
         private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
@@ -170,7 +126,7 @@ class LocalInputPreferredSlotSharingStrategy
         private final Map<ConsumedPartitionGroup, 
LinkedHashSet<ExecutionSlotSharingGroup>>
                 candidateGroupsForConsumedPartitionGroup;
 
-        private ExecutionSlotSharingGroupBuilder(
+        private LocalInputPreferredExecutionSlotSharingGroupBuilder(
                 final SchedulingTopology topology,
                 final Set<SlotSharingGroup> logicalSlotSharingGroups,
                 final Set<CoLocationGroup> coLocationGroups) {
@@ -212,7 +168,7 @@ class LocalInputPreferredSlotSharingStrategy
          */
         private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
             final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices =
-                    getExecutionVertices();
+                    getExecutionVertices(topology);
 
             // loop on job vertices so that an execution vertex will not be 
added into a group
             // if that group better fits another execution vertex
@@ -228,22 +184,6 @@ class LocalInputPreferredSlotSharingStrategy
             return executionSlotSharingGroupMap;
         }
 
-        /**
-         * The vertices are topologically sorted since {@link 
DefaultExecutionTopology#getVertices}
-         * are topologically sorted.
-         */
-        private LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
getExecutionVertices() {
-            final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
vertices =
-                    new LinkedHashMap<>();
-            for (SchedulingExecutionVertex executionVertex : 
topology.getVertices()) {
-                final List<SchedulingExecutionVertex> executionVertexGroup =
-                        vertices.computeIfAbsent(
-                                executionVertex.getId().getJobVertexId(), k -> 
new ArrayList<>());
-                executionVertexGroup.add(executionVertex);
-            }
-            return vertices;
-        }
-
         private List<SchedulingExecutionVertex> 
tryFindOptimalAvailableExecutionSlotSharingGroupFor(
                 final List<SchedulingExecutionVertex> executionVertices) {
 
@@ -394,8 +334,8 @@ class LocalInputPreferredSlotSharingStrategy
             final SlotSharingGroup slotSharingGroup =
                     getSlotSharingGroup(executionVertexId.getJobVertexId());
 
-            final ExecutionSlotSharingGroup newGroup = new 
ExecutionSlotSharingGroup();
-            newGroup.setResourceProfile(slotSharingGroup.getResourceProfile());
+            final ExecutionSlotSharingGroup newGroup =
+                    new ExecutionSlotSharingGroup(slotSharingGroup);
 
             // Once a new ExecutionSlotSharingGroup is created, it's available 
for all JobVertices
             // in this SlotSharingGroup
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java
new file mode 100644
index 00000000000..93f6e43ba81
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+    public static final Logger LOG =
+            
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+    TaskBalancedPreferredSlotSharingStrategy(
+            final SchedulingTopology topology,
+            final Set<SlotSharingGroup> slotSharingGroups,
+            final Set<CoLocationGroup> coLocationGroups) {
+        super(topology, slotSharingGroups, coLocationGroups);
+    }
+
+    @Override
+    protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> 
computeExecutionSlotSharingGroups(
+            SchedulingTopology schedulingTopology) {
+        return new TaskBalancedExecutionSlotSharingGroupBuilder(
+                        schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+                .build();
+    }
+
+    static class Factory implements SlotSharingStrategy.Factory {
+
+        public TaskBalancedPreferredSlotSharingStrategy create(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> slotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+
+            return new TaskBalancedPreferredSlotSharingStrategy(
+                    topology, slotSharingGroups, coLocationGroups);
+        }
+    }
+
+    /** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+    private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+        private final SchedulingTopology topology;
+
+        private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap;
+
+        /** Record the {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s. */
+        private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>>
+                paralleledExecutionSlotSharingGroupsMap;
+
+        /**
+         * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+         * SlotSharingGroup}s.
+         */
+        private final Map<SlotSharingGroup, Integer> slotSharingGroupIndexMap;
+
+        private final Map<ExecutionVertexID, ExecutionSlotSharingGroup>
+                executionSlotSharingGroupMap;
+
+        private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap;
+
+        private final Map<CoLocationConstraint, ExecutionSlotSharingGroup>
+                constraintToExecutionSlotSharingGroupMap;
+
+        private TaskBalancedExecutionSlotSharingGroupBuilder(
+                final SchedulingTopology topology,
+                final Set<SlotSharingGroup> slotSharingGroups,
+                final Set<CoLocationGroup> coLocationGroups) {
+            this.topology = checkNotNull(topology);
+
+            this.coLocationGroupMap = new HashMap<>();
+            for (CoLocationGroup coLocationGroup : coLocationGroups) {
+                for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
+                    coLocationGroupMap.put(jobVertexId, coLocationGroup);
+                }
+            }
+
+            this.constraintToExecutionSlotSharingGroupMap = new HashMap<>();
+            this.paralleledExecutionSlotSharingGroupsMap = new 
HashMap<>(slotSharingGroups.size());
+            this.slotSharingGroupIndexMap = new 
HashMap<>(slotSharingGroups.size());
+            this.slotSharingGroupMap = new HashMap<>();
+            this.executionSlotSharingGroupMap = new HashMap<>();
+
+            for (SlotSharingGroup slotSharingGroup : slotSharingGroups) {
+                for (JobVertexID jobVertexId : 
slotSharingGroup.getJobVertexIds()) {
+                    slotSharingGroupMap.put(jobVertexId, slotSharingGroup);
+                }
+            }
+        }
+
+        private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() {
+
+            final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> 
allVertices =
+                    getExecutionVertices(topology);
+
+            initParalleledExecutionSlotSharingGroupsMap(allVertices);
+
+            // Loop on job vertices
+            for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> 
executionVertexInfos :
+                    allVertices.entrySet()) {
+
+                JobVertexID jobVertexID = executionVertexInfos.getKey();
+                List<SchedulingExecutionVertex> executionVertices = 
executionVertexInfos.getValue();
+                final SlotSharingGroup slotSharingGroup = 
slotSharingGroupMap.get(jobVertexID);
+
+                if (!coLocationGroupMap.containsKey(jobVertexID)) {
+                    // For vertices without CoLocationConstraint.
+                    allocateNonCoLocatedVertices(slotSharingGroup, 
executionVertices);
+                } else {
+                    // For vertices with CoLocationConstraint.
+                    allocateCoLocatedVertices(slotSharingGroup, 
executionVertices);
+                }
+            }
+            return executionSlotSharingGroupMap;
+        }
+
+        private void initParalleledExecutionSlotSharingGroupsMap(
+                final LinkedHashMap<JobVertexID, 
List<SchedulingExecutionVertex>> allVertices) {
+
+            allVertices.entrySet().stream()
+                    .map(
+                            jobVertexExecutionVertices ->
+                                    Tuple2.of(
+                                            slotSharingGroupMap.get(
+                                                    
jobVertexExecutionVertices.getKey()),
+                                            
jobVertexExecutionVertices.getValue().size()))
+                    .collect(
+                            Collectors.groupingBy(
+                                    tuple -> tuple.f0,
+                                    Collectors.summarizingInt(tuple -> 
tuple.f1)))
+                    .forEach(
+                            (slotSharingGroup, statistics) -> {
+                                int slotNum = statistics.getMax();
+                                paralleledExecutionSlotSharingGroupsMap.put(
+                                        slotSharingGroup,
+                                        createExecutionSlotSharingGroups(
+                                                slotSharingGroup, slotNum));
+                            });
+        }
+
+        private List<ExecutionSlotSharingGroup> 
createExecutionSlotSharingGroups(
+                SlotSharingGroup slotSharingGroup, int slotNum) {
+            final List<ExecutionSlotSharingGroup> executionSlotSharingGroups =
+                    new ArrayList<>(slotNum);
+            for (int i = 0; i < slotNum; i++) {
+                final ExecutionSlotSharingGroup executionSlotSharingGroup =
+                        new ExecutionSlotSharingGroup(slotSharingGroup);
+                executionSlotSharingGroups.add(i, executionSlotSharingGroup);
+                LOG.debug(
+                        "Create {}th executionSlotSharingGroup {}.", i, 
executionSlotSharingGroup);
+            }
+            return executionSlotSharingGroups;
+        }
+
+        private void allocateCoLocatedVertices(
+                SlotSharingGroup slotSharingGroup,
+                List<SchedulingExecutionVertex> executionVertices) {
+
+            final List<ExecutionSlotSharingGroup> executionSlotSharingGroups =
+                    
paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
+            for (SchedulingExecutionVertex executionVertex : 
executionVertices) {
+                final CoLocationConstraint coLocationConstraint =
+                        getCoLocationConstraint(executionVertex);
+                ExecutionSlotSharingGroup executionSlotSharingGroup =
+                        
constraintToExecutionSlotSharingGroupMap.get(coLocationConstraint);
+                if (Objects.isNull(executionSlotSharingGroup)) {
+                    executionSlotSharingGroup =
+                            executionSlotSharingGroups.get(
+                                    getLeastUtilizeSlotIndex(
+                                            executionSlotSharingGroups, 
executionVertex));
+                    constraintToExecutionSlotSharingGroupMap.put(
+                            coLocationConstraint, executionSlotSharingGroup);
+                }
+                
addVertexToExecutionSlotSharingGroup(executionSlotSharingGroup, 
executionVertex);
+            }
+            final int jobVertexParallel = executionVertices.size();
+            if (!isMaxParallelism(jobVertexParallel, slotSharingGroup)) {
+                int index = 
getLeastUtilizeSlotIndex(executionSlotSharingGroups, null);
+                updateSlotRoundRobinIndexIfNeeded(jobVertexParallel, 
slotSharingGroup, index);
+            }
+        }
+
+        private void allocateNonCoLocatedVertices(
+                SlotSharingGroup slotSharingGroup,
+                List<SchedulingExecutionVertex> executionVertices) {
+            final int jobVertexParallel = executionVertices.size();
+            int index = getSlotRoundRobinIndex(jobVertexParallel, 
slotSharingGroup);
+            final List<ExecutionSlotSharingGroup> executionSlotSharingGroups =
+                    
paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
+            for (SchedulingExecutionVertex executionVertex : 
executionVertices) {
+                addVertexToExecutionSlotSharingGroup(
+                        executionSlotSharingGroups.get(index), 
executionVertex);
+                index = ++index % executionSlotSharingGroups.size();
+            }
+            updateSlotRoundRobinIndexIfNeeded(executionVertices.size(), 
slotSharingGroup, index);
+        }
+
+        private void addVertexToExecutionSlotSharingGroup(
+                ExecutionSlotSharingGroup executionSlotSharingGroup,
+                SchedulingExecutionVertex executionVertex) {
+            final ExecutionVertexID executionVertexId = 
executionVertex.getId();
+            executionSlotSharingGroup.addVertex(executionVertexId);
+            executionSlotSharingGroupMap.put(executionVertexId, 
executionSlotSharingGroup);
+        }
+
+        private CoLocationConstraint 
getCoLocationConstraint(SchedulingExecutionVertex sev) {
+            final JobVertexID jobVertexID = sev.getId().getJobVertexId();
+            final int subtaskIndex = sev.getId().getSubtaskIndex();
+            return 
coLocationGroupMap.get(jobVertexID).getLocationConstraint(subtaskIndex);
+        }
+
+        private int getSlotRoundRobinIndex(
+                final int jobVertexParallelism, SlotSharingGroup 
slotSharingGroup) {
+            final boolean maxParallel = isMaxParallelism(jobVertexParallelism, 
slotSharingGroup);
+            return maxParallel ? 0 : 
slotSharingGroupIndexMap.getOrDefault(slotSharingGroup, 0);
+        }
+
+        private void updateSlotRoundRobinIndexIfNeeded(
+                final int jobVertexParallelism,
+                final SlotSharingGroup slotSharingGroup,
+                final int nextIndex) {
+            if (!isMaxParallelism(jobVertexParallelism, slotSharingGroup)) {
+                slotSharingGroupIndexMap.put(slotSharingGroup, nextIndex);
+            }
+        }
+
+        private boolean isMaxParallelism(
+                final int jobVertexParallelism, final SlotSharingGroup 
slotSharingGroup) {
+            final List<ExecutionSlotSharingGroup> executionSlotSharingGroups =
+                    
paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup);
+            return jobVertexParallelism == executionSlotSharingGroups.size();
+        }
+
+        private int getLeastUtilizeSlotIndex(
+                final List<ExecutionSlotSharingGroup> 
executionSlotSharingGroups,
+                @Nullable final SchedulingExecutionVertex executionVertex) {
+            int indexWithLeastExecutionVertices = 0;
+            int leastExecutionVertices = Integer.MAX_VALUE;
+            for (int index = 0; index < executionSlotSharingGroups.size(); 
index++) {
+                final ExecutionSlotSharingGroup executionSlotSharingGroup =
+                        executionSlotSharingGroups.get(index);
+                final int executionVertices =
+                        
executionSlotSharingGroup.getExecutionVertexIds().size();
+                if (leastExecutionVertices > executionVertices
+                        && (Objects.isNull(executionVertex)
+                                || allocatable(executionSlotSharingGroup, 
executionVertex))) {
+                    indexWithLeastExecutionVertices = index;
+                    leastExecutionVertices = executionVertices;
+                }
+            }
+            return indexWithLeastExecutionVertices;
+        }
+
+        private boolean allocatable(
+                final ExecutionSlotSharingGroup executionSlotSharingGroup,
+                @Nonnull SchedulingExecutionVertex executionVertex) {
+            final ExecutionVertexID executionVertexId = 
executionVertex.getId();
+            final JobVertexID jobVertexId = executionVertexId.getJobVertexId();
+            final Set<JobVertexID> allocatedJobVertices =
+                    executionSlotSharingGroup.getExecutionVertexIds().stream()
+                            .map(ExecutionVertexID::getJobVertexId)
+                            .collect(Collectors.toSet());
+            return !allocatedJobVertices.contains(jobVertexId);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategyTest.java
new file mode 100644
index 00000000000..c0fc8ba94e2
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategyTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava32.com.google.common.collect.Sets;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Abstract test class base for {@link SlotSharingStrategy}. */
+abstract class AbstractSlotSharingStrategyTest {
+
+    protected TestingSchedulingTopology topology;
+
+    protected JobVertexID jobVertexId1;
+    protected JobVertexID jobVertexId2;
+
+    protected SlotSharingGroup slotSharingGroup;
+    protected SlotSharingGroup slotSharingGroup1;
+    protected SlotSharingGroup slotSharingGroup2;
+
+    @BeforeEach
+    protected void setup() {
+        this.topology = new TestingSchedulingTopology();
+        this.jobVertexId1 = new JobVertexID();
+        this.jobVertexId2 = new JobVertexID();
+        this.slotSharingGroup = new SlotSharingGroup();
+        this.slotSharingGroup1 = new SlotSharingGroup();
+        this.slotSharingGroup2 = new SlotSharingGroup();
+    }
+
+    @Test
+    void testSetSlotSharingGroupResource() {
+        final TestingSchedulingExecutionVertex ev10 = 
topology.newExecutionVertex(jobVertexId1, 0);
+        final TestingSchedulingExecutionVertex ev11 = 
topology.newExecutionVertex(jobVertexId1, 1);
+        final TestingSchedulingExecutionVertex ev20 = 
topology.newExecutionVertex(jobVertexId2, 0);
+
+        final ResourceProfile resourceProfile1 = 
ResourceProfile.fromResources(1, 10);
+        slotSharingGroup1.addVertexToGroup(jobVertexId1);
+        slotSharingGroup1.setResourceProfile(resourceProfile1);
+
+        final ResourceProfile resourceProfile2 = 
ResourceProfile.fromResources(2, 20);
+        slotSharingGroup2.addVertexToGroup(jobVertexId2);
+        slotSharingGroup2.setResourceProfile(resourceProfile2);
+
+        final SlotSharingStrategy strategy =
+                getSlotSharingStrategy(
+                        topology,
+                        Sets.newHashSet(slotSharingGroup1, slotSharingGroup2),
+                        Sets.newHashSet());
+
+        assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(3);
+        
assertThat(strategy.getExecutionSlotSharingGroup(ev10.getId()).getResourceProfile())
+                .isEqualTo(resourceProfile1);
+        
assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getResourceProfile())
+                .isEqualTo(resourceProfile1);
+        
assertThat(strategy.getExecutionSlotSharingGroup(ev20.getId()).getResourceProfile())
+                .isEqualTo(resourceProfile2);
+    }
+
+    protected abstract SlotSharingStrategy getSlotSharingStrategy(
+            final SchedulingTopology topology,
+            final Set<SlotSharingGroup> slotSharingGroups,
+            final Set<CoLocationGroup> coLocationGroups);
+
+    protected void renderTopology(
+            TestingSchedulingTopology topology,
+            List<TestingJobVertexInfo> mockedJobVertices,
+            List<Tuple2<JobVertexID, List<TestingSchedulingExecutionVertex>>> 
jobVertexInfos) {
+        for (TestingJobVertexInfo testingJobVertexInfo : mockedJobVertices) {
+            List<TestingSchedulingExecutionVertex> tSEVs = new ArrayList<>();
+            for (int subIndex = 0; subIndex < 
testingJobVertexInfo.parallelism; subIndex++) {
+                tSEVs.add(
+                        topology.newExecutionVertex(
+                                testingJobVertexInfo.jobVertex.getID(), 
subIndex));
+            }
+            
jobVertexInfos.add(Tuple2.of(testingJobVertexInfo.jobVertex.getID(), tSEVs));
+        }
+    }
+
+    /** Util class to represent the simple job vertex information. */
+    protected static class TestingJobVertexInfo {
+        final JobVertex jobVertex = new JobVertex(null, new JobVertexID());
+        @Nonnull SlotSharingGroup slotSharingGroup;
+        @Nullable CoLocationGroup coLocationGroup;
+        int parallelism;
+
+        public TestingJobVertexInfo(
+                int parallelism,
+                @Nonnull SlotSharingGroup slotSharingGroup,
+                @Nullable CoLocationGroup coLocationGroup) {
+            Preconditions.checkArgument(parallelism > 0);
+            this.parallelism = parallelism;
+            this.slotSharingGroup = slotSharingGroup;
+            this.coLocationGroup = coLocationGroup;
+
+            this.slotSharingGroup.addVertexToGroup(jobVertex.getID());
+            if (this.coLocationGroup != null) {
+                ((CoLocationGroupImpl) 
this.coLocationGroup).addVertex(jobVertex);
+            }
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java
index dcdaa96c947..829c4865a81 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.scheduler;
 
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import 
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
@@ -33,15 +33,17 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
 import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
-import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorExtension;
 
-import org.junit.jupiter.api.BeforeEach;
+import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava32.com.google.common.collect.Sets;
+
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -53,83 +55,41 @@ import java.util.concurrent.ScheduledExecutorService;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link LocalInputPreferredSlotSharingStrategy}. */
-class LocalInputPreferredSlotSharingStrategyTest {
+class LocalInputPreferredSlotSharingStrategyTest extends 
AbstractSlotSharingStrategyTest {
 
     @RegisterExtension
     private static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_EXTENSION =
             TestingUtils.defaultExecutorExtension();
 
-    private TestingSchedulingTopology topology;
-
-    private static final JobVertexID JOB_VERTEX_ID_1 = new JobVertexID();
-    private static final JobVertexID JOB_VERTEX_ID_2 = new JobVertexID();
+    private final JobVertexID jobVertexId3 = new JobVertexID();
 
     private TestingSchedulingExecutionVertex ev11;
     private TestingSchedulingExecutionVertex ev12;
     private TestingSchedulingExecutionVertex ev21;
     private TestingSchedulingExecutionVertex ev22;
-
-    private Set<SlotSharingGroup> slotSharingGroups;
-
-    @BeforeEach
-    void setUp() {
-        topology = new TestingSchedulingTopology();
-
-        ev11 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 0);
-        ev12 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 1);
-
-        ev21 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 0);
-        ev22 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 1);
-
-        final SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
-        slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1);
-        slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_2);
-        slotSharingGroups = Collections.singleton(slotSharingGroup);
-    }
-
-    @Test
-    void testCoLocationConstraintIsRespected() {
-        topology.connect(ev11, ev22);
-        topology.connect(ev12, ev21);
-
-        final CoLocationGroup coLocationGroup =
-                new TestingCoLocationGroup(JOB_VERTEX_ID_1, JOB_VERTEX_ID_2);
-        final Set<CoLocationGroup> coLocationGroups = 
Collections.singleton(coLocationGroup);
-
-        final SlotSharingStrategy strategy =
-                new LocalInputPreferredSlotSharingStrategy(
-                        topology, slotSharingGroups, coLocationGroups);
-
-        assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2);
-        
assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds())
-                .contains(ev11.getId(), ev21.getId());
-        
assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getExecutionVertexIds())
-                .contains(ev12.getId(), ev22.getId());
+    private TestingSchedulingExecutionVertex ev23;
+
+    @Override
+    protected SlotSharingStrategy getSlotSharingStrategy(
+            SchedulingTopology topology,
+            Set<SlotSharingGroup> slotSharingGroups,
+            Set<CoLocationGroup> coLocationGroups) {
+        return new LocalInputPreferredSlotSharingStrategy(
+                topology, slotSharingGroups, coLocationGroups);
     }
 
     @Test
     void testInputLocalityIsRespectedWithRescaleEdge() {
-        final TestingSchedulingTopology topology = new 
TestingSchedulingTopology();
-
-        final TestingSchedulingExecutionVertex ev11 =
-                topology.newExecutionVertex(JOB_VERTEX_ID_1, 0);
-        final TestingSchedulingExecutionVertex ev12 =
-                topology.newExecutionVertex(JOB_VERTEX_ID_1, 1);
-
-        final TestingSchedulingExecutionVertex ev21 =
-                topology.newExecutionVertex(JOB_VERTEX_ID_2, 0);
-        final TestingSchedulingExecutionVertex ev22 =
-                topology.newExecutionVertex(JOB_VERTEX_ID_2, 1);
-        final TestingSchedulingExecutionVertex ev23 =
-                topology.newExecutionVertex(JOB_VERTEX_ID_2, 2);
+        createTwoExeVerticesPerJv1AndJv2(slotSharingGroup);
+        ev23 = topology.newExecutionVertex(jobVertexId2, 2);
 
         topology.connect(ev11, ev21);
         topology.connect(ev11, ev22);
         topology.connect(ev12, ev23);
 
         final SlotSharingStrategy strategy =
-                new LocalInputPreferredSlotSharingStrategy(
-                        topology, slotSharingGroups, Collections.emptySet());
+                getSlotSharingStrategy(
+                        topology, Sets.newHashSet(slotSharingGroup), 
Collections.emptySet());
 
         assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(3);
         
assertThat(strategy.getExecutionSlotSharingGroup(ev21.getId()).getExecutionVertexIds())
@@ -140,19 +100,30 @@ class LocalInputPreferredSlotSharingStrategyTest {
                 .contains(ev12.getId(), ev23.getId());
     }
 
+    private void createTwoExeVerticesPerJv1AndJv2(SlotSharingGroup 
sharingGroup) {
+        ev11 = topology.newExecutionVertex(jobVertexId1, 0);
+        ev12 = topology.newExecutionVertex(jobVertexId1, 1);
+
+        ev21 = topology.newExecutionVertex(jobVertexId2, 0);
+        ev22 = topology.newExecutionVertex(jobVertexId2, 1);
+        sharingGroup.addVertexToGroup(jobVertexId1);
+        sharingGroup.addVertexToGroup(jobVertexId2);
+    }
+
     @Test
     void testInputLocalityIsRespectedWithAllToAllEdge() {
-        final TestingSchedulingTopology topology = new 
TestingSchedulingTopology();
+        slotSharingGroup.addVertexToGroup(jobVertexId1);
+        slotSharingGroup.addVertexToGroup(jobVertexId2);
 
         final List<TestingSchedulingExecutionVertex> producer =
                 topology.addExecutionVertices()
                         .withParallelism(2)
-                        .withJobVertexID(JOB_VERTEX_ID_1)
+                        .withJobVertexID(jobVertexId1)
                         .finish();
         final List<TestingSchedulingExecutionVertex> consumer =
                 topology.addExecutionVertices()
                         .withParallelism(2)
-                        .withJobVertexID(JOB_VERTEX_ID_2)
+                        .withJobVertexID(jobVertexId2)
                         .finish();
 
         topology.connectAllToAll(producer, consumer)
@@ -166,8 +137,8 @@ class LocalInputPreferredSlotSharingStrategyTest {
         ev22 = consumer.get(1);
 
         final SlotSharingStrategy strategy =
-                new LocalInputPreferredSlotSharingStrategy(
-                        topology, slotSharingGroups, Collections.emptySet());
+                getSlotSharingStrategy(
+                        topology, Sets.newHashSet(slotSharingGroup), 
Collections.emptySet());
         assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2);
         
assertThat(strategy.getExecutionSlotSharingGroup(ev21.getId()).getExecutionVertexIds())
                 .contains(ev11.getId(), ev21.getId());
@@ -175,11 +146,56 @@ class LocalInputPreferredSlotSharingStrategyTest {
                 .contains(ev12.getId(), ev22.getId());
     }
 
+    @Test
+    void testCoLocationConstraintIsRespected() {
+        List<Tuple2<JobVertexID, List<TestingSchedulingExecutionVertex>>> 
jobVertexInfos =
+                new ArrayList<>();
+        CoLocationGroup coLocationGroup1 = new CoLocationGroupImpl();
+        CoLocationGroup coLocationGroup2 = new CoLocationGroupImpl();
+        List<TestingJobVertexInfo> mockedJobVertices =
+                Lists.newArrayList(
+                        new TestingJobVertexInfo(1, slotSharingGroup, null),
+                        new TestingJobVertexInfo(2, slotSharingGroup, 
coLocationGroup1),
+                        new TestingJobVertexInfo(2, slotSharingGroup, 
coLocationGroup1),
+                        new TestingJobVertexInfo(3, slotSharingGroup, 
coLocationGroup2),
+                        new TestingJobVertexInfo(3, slotSharingGroup, 
coLocationGroup2));
+        renderTopology(topology, mockedJobVertices, jobVertexInfos);
+
+        final SlotSharingStrategy strategy =
+                getSlotSharingStrategy(
+                        topology,
+                        Sets.newHashSet(slotSharingGroup),
+                        Sets.newHashSet(coLocationGroup1, coLocationGroup2));
+        List<TestingSchedulingExecutionVertex> executionVertices1 = 
jobVertexInfos.get(1).f1;
+        List<TestingSchedulingExecutionVertex> executionVertices2 = 
jobVertexInfos.get(2).f1;
+
+        assertThat(executionVertices1).hasSameSizeAs(executionVertices2);
+        for (int i = 0; i < executionVertices1.size(); i++) {
+            ExecutionSlotSharingGroup executionSlotSharingGroup =
+                    
strategy.getExecutionSlotSharingGroup(executionVertices1.get(i).getId());
+            assertThat(executionSlotSharingGroup)
+                    .isEqualTo(
+                            strategy.getExecutionSlotSharingGroup(
+                                    executionVertices2.get(i).getId()));
+        }
+
+        List<TestingSchedulingExecutionVertex> executionVertices3 = 
jobVertexInfos.get(3).f1;
+        List<TestingSchedulingExecutionVertex> executionVertices4 = 
jobVertexInfos.get(4).f1;
+        assertThat(executionVertices3).hasSameSizeAs(executionVertices4);
+        for (int i = 0; i < executionVertices3.size(); i++) {
+            
assertThat(strategy.getExecutionSlotSharingGroup(executionVertices3.get(i).getId()))
+                    .isEqualTo(
+                            strategy.getExecutionSlotSharingGroup(
+                                    executionVertices4.get(i).getId()));
+        }
+    }
+
     @Test
     void testDisjointVerticesInOneGroup() {
+        createTwoExeVerticesPerJv1AndJv2(slotSharingGroup);
         final SlotSharingStrategy strategy =
-                new LocalInputPreferredSlotSharingStrategy(
-                        topology, slotSharingGroups, Collections.emptySet());
+                getSlotSharingStrategy(
+                        topology, Sets.newHashSet(slotSharingGroup), 
Collections.emptySet());
 
         assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2);
         
assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds())
@@ -190,18 +206,20 @@ class LocalInputPreferredSlotSharingStrategyTest {
 
     @Test
     void testVerticesInDifferentSlotSharingGroups() {
-        final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
-        slotSharingGroup1.addVertexToGroup(JOB_VERTEX_ID_1);
-        final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
-        slotSharingGroup2.addVertexToGroup(JOB_VERTEX_ID_2);
 
-        final Set<SlotSharingGroup> slotSharingGroups = new HashSet<>();
-        slotSharingGroups.add(slotSharingGroup1);
-        slotSharingGroups.add(slotSharingGroup2);
+        ev11 = topology.newExecutionVertex(jobVertexId1, 0);
+        ev12 = topology.newExecutionVertex(jobVertexId1, 1);
+        ev21 = topology.newExecutionVertex(jobVertexId2, 0);
+        ev22 = topology.newExecutionVertex(jobVertexId2, 1);
+
+        slotSharingGroup1.addVertexToGroup(jobVertexId1);
+        slotSharingGroup2.addVertexToGroup(jobVertexId2);
 
         final SlotSharingStrategy strategy =
-                new LocalInputPreferredSlotSharingStrategy(
-                        topology, slotSharingGroups, Collections.emptySet());
+                getSlotSharingStrategy(
+                        topology,
+                        Sets.newHashSet(slotSharingGroup1, slotSharingGroup2),
+                        Collections.emptySet());
 
         assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(4);
         
assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds())
@@ -214,46 +232,17 @@ class LocalInputPreferredSlotSharingStrategyTest {
                 .contains(ev22.getId());
     }
 
-    @Test
-    void testSetSlotSharingGroupResource() {
-        final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
-        final ResourceProfile resourceProfile1 = 
ResourceProfile.fromResources(1, 10);
-        slotSharingGroup1.addVertexToGroup(JOB_VERTEX_ID_1);
-        slotSharingGroup1.setResourceProfile(resourceProfile1);
-        final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
-        final ResourceProfile resourceProfile2 = 
ResourceProfile.fromResources(2, 20);
-        slotSharingGroup2.addVertexToGroup(JOB_VERTEX_ID_2);
-        slotSharingGroup2.setResourceProfile(resourceProfile2);
-
-        final Set<SlotSharingGroup> slotSharingGroups = new HashSet<>();
-        slotSharingGroups.add(slotSharingGroup1);
-        slotSharingGroups.add(slotSharingGroup2);
-
-        final SlotSharingStrategy strategy =
-                new LocalInputPreferredSlotSharingStrategy(
-                        topology, slotSharingGroups, Collections.emptySet());
-
-        assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(4);
-        
assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getResourceProfile())
-                .isEqualTo(resourceProfile1);
-        
assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getResourceProfile())
-                .isEqualTo(resourceProfile1);
-        
assertThat(strategy.getExecutionSlotSharingGroup(ev21.getId()).getResourceProfile())
-                .isEqualTo(resourceProfile2);
-        
assertThat(strategy.getExecutionSlotSharingGroup(ev22.getId()).getResourceProfile())
-                .isEqualTo(resourceProfile2);
-    }
-
     /**
      * In this test case, there are two JobEdges between two JobVertices. 
There will be no
      * ExecutionSlotSharingGroup that contains two vertices with the same 
JobVertexID.
      */
     @Test
     void testInputLocalityIsRespectedWithTwoEdgesBetweenTwoVertices() throws 
Exception {
+        createTwoExeVerticesPerJv1AndJv2(slotSharingGroup);
         int parallelism = 4;
 
-        JobVertex v1 = createJobVertex("v1", JOB_VERTEX_ID_1, parallelism);
-        JobVertex v2 = createJobVertex("v2", JOB_VERTEX_ID_2, parallelism);
+        JobVertex v1 = createJobVertex("v1", jobVertexId1, parallelism);
+        JobVertex v2 = createJobVertex("v2", jobVertexId2, parallelism);
 
         v2.connectNewDataSetAsInput(
                 v1, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
@@ -271,17 +260,15 @@ class LocalInputPreferredSlotSharingStrategyTest {
         final SchedulingTopology topology = 
executionGraph.getSchedulingTopology();
 
         final SlotSharingStrategy strategy =
-                new LocalInputPreferredSlotSharingStrategy(
-                        topology, slotSharingGroups, Collections.emptySet());
+                getSlotSharingStrategy(
+                        topology, Sets.newHashSet(slotSharingGroup), 
Collections.emptySet());
 
         assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(4);
 
         ExecutionVertex[] ev1 =
-                
Objects.requireNonNull(executionGraph.getJobVertex(JOB_VERTEX_ID_1))
-                        .getTaskVertices();
+                
Objects.requireNonNull(executionGraph.getJobVertex(jobVertexId1)).getTaskVertices();
         ExecutionVertex[] ev2 =
-                
Objects.requireNonNull(executionGraph.getJobVertex(JOB_VERTEX_ID_2))
-                        .getTaskVertices();
+                
Objects.requireNonNull(executionGraph.getJobVertex(jobVertexId2)).getTaskVertices();
         for (int i = 0; i < parallelism; i++) {
             assertThat(
                             
strategy.getExecutionSlotSharingGroup(ev1[i].getID())
@@ -292,29 +279,20 @@ class LocalInputPreferredSlotSharingStrategyTest {
 
     @Test
     void testGetExecutionSlotSharingGroupOfLateAttachedVertices() {
+        slotSharingGroup1.addVertexToGroup(jobVertexId1);
+        slotSharingGroup1.addVertexToGroup(jobVertexId2);
+        slotSharingGroup2.addVertexToGroup(jobVertexId3);
 
-        JobVertexID jobVertexID1 = new JobVertexID();
-        JobVertexID jobVertexID2 = new JobVertexID();
-        JobVertexID jobVertexID3 = new JobVertexID();
-
-        final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
-        slotSharingGroup1.addVertexToGroup(jobVertexID1);
-        slotSharingGroup1.addVertexToGroup(jobVertexID2);
-
-        final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
-        slotSharingGroup2.addVertexToGroup(jobVertexID3);
-
-        TestingSchedulingTopology topology = new TestingSchedulingTopology();
-
-        TestingSchedulingExecutionVertex ev1 = 
topology.newExecutionVertex(jobVertexID1, 0);
-        TestingSchedulingExecutionVertex ev2 = 
topology.newExecutionVertex(jobVertexID2, 0);
+        TestingSchedulingExecutionVertex ev1 = 
topology.newExecutionVertex(jobVertexId1, 0);
+        TestingSchedulingExecutionVertex ev2 = 
topology.newExecutionVertex(jobVertexId2, 0);
         topology.connect(ev1, ev2);
 
         final LocalInputPreferredSlotSharingStrategy strategy =
-                new LocalInputPreferredSlotSharingStrategy(
-                        topology,
-                        new HashSet<>(Arrays.asList(slotSharingGroup1, 
slotSharingGroup2)),
-                        Collections.emptySet());
+                (LocalInputPreferredSlotSharingStrategy)
+                        getSlotSharingStrategy(
+                                topology,
+                                new HashSet<>(Arrays.asList(slotSharingGroup1, 
slotSharingGroup2)),
+                                Collections.emptySet());
 
         assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(1);
         
assertThat(strategy.getExecutionSlotSharingGroup(ev1.getId()).getExecutionVertexIds())
@@ -323,7 +301,7 @@ class LocalInputPreferredSlotSharingStrategyTest {
                 .contains(ev1.getId(), ev2.getId());
 
         // add new job vertices and notify scheduling topology updated
-        TestingSchedulingExecutionVertex ev3 = 
topology.newExecutionVertex(jobVertexID3, 0);
+        TestingSchedulingExecutionVertex ev3 = 
topology.newExecutionVertex(jobVertexId3, 0);
         topology.connect(ev2, ev3, ResultPartitionType.BLOCKING);
         strategy.notifySchedulingTopologyUpdated(topology, 
Collections.singletonList(ev3.getId()));
 
@@ -343,18 +321,4 @@ class LocalInputPreferredSlotSharingStrategyTest {
         jobVertex.setInvokableClass(NoOpInvokable.class);
         return jobVertex;
     }
-
-    private static class TestingCoLocationGroup extends CoLocationGroupImpl {
-
-        private final List<JobVertexID> vertexIDs;
-
-        private TestingCoLocationGroup(JobVertexID... vertexIDs) {
-            this.vertexIDs = Arrays.asList(vertexIDs);
-        }
-
-        @Override
-        public List<JobVertexID> getVertexIds() {
-            return vertexIDs;
-        }
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java
index 26d96ba189f..68d4361c4f6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -64,7 +65,8 @@ class MergingSharedSlotProfileRetrieverTest {
 
         SlotProfile slotProfile =
                 sharedSlotProfileRetriever.getSlotProfile(
-                        new ExecutionSlotSharingGroup(), ResourceProfile.ZERO);
+                        new ExecutionSlotSharingGroup(new SlotSharingGroup()),
+                        ResourceProfile.ZERO);
 
         
assertThat(slotProfile.getTaskResourceProfile()).isEqualTo(ResourceProfile.ZERO);
         
assertThat(slotProfile.getPhysicalSlotResourceProfile()).isEqualTo(ResourceProfile.ZERO);
@@ -197,7 +199,8 @@ class MergingSharedSlotProfileRetrieverTest {
                                 () -> new HashSet<>(reservedAllocationIds))
                         .createFromBulk(new HashSet<>(executions));
 
-        ExecutionSlotSharingGroup executionSlotSharingGroup = new 
ExecutionSlotSharingGroup();
+        ExecutionSlotSharingGroup executionSlotSharingGroup =
+                new ExecutionSlotSharingGroup(new SlotSharingGroup());
         executions.stream()
                 .limit(executionSlotSharingGroupSize)
                 .forEach(executionSlotSharingGroup::addVertex);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTestingUtils.java
index 39a1580221e..c3126b17c51 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTestingUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 class SharedSlotTestingUtils {
@@ -25,7 +26,7 @@ class SharedSlotTestingUtils {
 
     static ExecutionSlotSharingGroup createExecutionSlotSharingGroup(
             ExecutionVertexID... executions) {
-        ExecutionSlotSharingGroup group = new ExecutionSlotSharingGroup();
+        ExecutionSlotSharingGroup group = new ExecutionSlotSharingGroup(new 
SlotSharingGroup());
         for (ExecutionVertexID execution : executions) {
             group.addVertex(execution);
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
index edab30c0afc..a73ab7ed859 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingPayload;
@@ -642,9 +643,10 @@ class SlotSharingExecutionSlotAllocatorTest {
                     new HashMap<>();
             for (Map.Entry<ExecutionVertexID[], ResourceProfile> 
groupAndResource :
                     groupAndResources.entrySet()) {
+                SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+                
slotSharingGroup.setResourceProfile(groupAndResource.getValue());
                 ExecutionSlotSharingGroup executionSlotSharingGroup =
-                        new ExecutionSlotSharingGroup();
-                
executionSlotSharingGroup.setResourceProfile(groupAndResource.getValue());
+                        new ExecutionSlotSharingGroup(slotSharingGroup);
                 for (ExecutionVertexID executionVertexId : 
groupAndResource.getKey()) {
                     executionSlotSharingGroup.addVertex(executionVertexId);
                     executionSlotSharingGroups.put(executionVertexId, 
executionSlotSharingGroup);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java
new file mode 100644
index 00000000000..c975f238e6e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
+
+import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava32.com.google.common.collect.Sets;
+
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */
+class TaskBalancedPreferredSlotSharingStrategyTest extends 
AbstractSlotSharingStrategyTest {
+
+    @Override
+    protected SlotSharingStrategy getSlotSharingStrategy(
+            SchedulingTopology topology,
+            Set<SlotSharingGroup> slotSharingGroups,
+            Set<CoLocationGroup> coLocationGroups) {
+        return new TaskBalancedPreferredSlotSharingStrategy(
+                topology, slotSharingGroups, coLocationGroups);
+    }
+
+    @Test
+    void testVerticesInDifferentSlotSharingGroups() {
+        List<Tuple2<JobVertexID, List<TestingSchedulingExecutionVertex>>> 
jobVertexInfos =
+                new ArrayList<>();
+        List<TestingJobVertexInfo> testingJobVertexInfos =
+                Lists.newArrayList(
+                        new TestingJobVertexInfo(1, slotSharingGroup1, null),
+                        new TestingJobVertexInfo(2, slotSharingGroup1, null),
+                        new TestingJobVertexInfo(3, slotSharingGroup1, null),
+                        new TestingJobVertexInfo(1, slotSharingGroup2, null),
+                        new TestingJobVertexInfo(2, slotSharingGroup2, null),
+                        new TestingJobVertexInfo(2, slotSharingGroup2, null));
+
+        renderTopology(topology, testingJobVertexInfos, jobVertexInfos);
+        final SlotSharingStrategy strategy =
+                getSlotSharingStrategy(
+                        topology,
+                        Sets.newHashSet(slotSharingGroup1, slotSharingGroup2),
+                        Sets.newHashSet());
+        assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(5);
+        checkBalanceAtSlotsLevelWithoutCoLocation(strategy);
+
+        List<TestingSchedulingExecutionVertex> executionVertices4 = 
jobVertexInfos.get(4).f1;
+        List<TestingSchedulingExecutionVertex> executionVertices5 = 
jobVertexInfos.get(5).f1;
+        assertThat(executionVertices4).hasSameSizeAs(executionVertices5);
+        // Check for JVs whose parallelism is the max in the same slot sharing 
group.
+        for (int i = 0; i < executionVertices4.size(); i++) {
+            TestingSchedulingExecutionVertex executionVertex4 = 
executionVertices4.get(i);
+            
assertThat(strategy.getExecutionSlotSharingGroup(executionVertex4.getId()))
+                    .isEqualTo(
+                            strategy.getExecutionSlotSharingGroup(
+                                    executionVertices5.get(i).getId()));
+        }
+    }
+
+    private void checkBalanceAtSlotsLevelWithoutCoLocation(SlotSharingStrategy 
strategy) {
+        strategy.getExecutionSlotSharingGroups().stream()
+                
.collect(Collectors.groupingBy(ExecutionSlotSharingGroup::getSlotSharingGroup))
+                .forEach(
+                        (slotSharingGroup, executionSlotSharingGroups) -> {
+                            Optional<Integer> max =
+                                    executionSlotSharingGroups.stream()
+                                            .map(
+                                                    executionSlotSharingGroup 
->
+                                                            
executionSlotSharingGroup
+                                                                    
.getExecutionVertexIds()
+                                                                    .size())
+                                            .max(Comparator.comparing(i -> i));
+                            Optional<Integer> min =
+                                    executionSlotSharingGroups.stream()
+                                            .map(
+                                                    executionSlotSharingGroup 
->
+                                                            
executionSlotSharingGroup
+                                                                    
.getExecutionVertexIds()
+                                                                    .size())
+                                            .min(Comparator.comparing(i -> i));
+                            assertThat(max.get()).isCloseTo(min.get(), 
Offset.offset(1));
+                        });
+    }
+
+    @Test
+    void testCoLocationConstraintIsRespected() {
+        List<Tuple2<JobVertexID, List<TestingSchedulingExecutionVertex>>> 
jobVertexInfos =
+                new ArrayList<>();
+        CoLocationGroup coLocationGroup1 = new CoLocationGroupImpl();
+        CoLocationGroup coLocationGroup2 = new CoLocationGroupImpl();
+        TestingJobVertexInfo tJv0 = new TestingJobVertexInfo(1, 
slotSharingGroup, null);
+        TestingJobVertexInfo tJv1 = new TestingJobVertexInfo(2, 
slotSharingGroup, coLocationGroup1);
+        TestingJobVertexInfo tJv2 = new TestingJobVertexInfo(2, 
slotSharingGroup, coLocationGroup1);
+        TestingJobVertexInfo tJv3 = new TestingJobVertexInfo(1, 
slotSharingGroup, null);
+        TestingJobVertexInfo tJv4 = new TestingJobVertexInfo(4, 
slotSharingGroup, coLocationGroup1);
+        TestingJobVertexInfo tJv5 = new TestingJobVertexInfo(4, 
slotSharingGroup, coLocationGroup1);
+        TestingJobVertexInfo tJv6 = new TestingJobVertexInfo(3, 
slotSharingGroup, coLocationGroup2);
+        List<TestingJobVertexInfo> mockedJobVertices =
+                Lists.newArrayList(tJv0, tJv1, tJv2, tJv3, tJv4, tJv5, tJv6);
+        renderTopology(topology, mockedJobVertices, jobVertexInfos);
+        SlotSharingStrategy strategy =
+                getSlotSharingStrategy(
+                        topology,
+                        Sets.newHashSet(slotSharingGroup),
+                        Sets.newHashSet(coLocationGroup1, coLocationGroup2));
+
+        List<TestingSchedulingExecutionVertex> jv0Vertices = 
jobVertexInfos.get(0).f1;
+        List<TestingSchedulingExecutionVertex> jv1Vertices = 
jobVertexInfos.get(1).f1;
+        List<TestingSchedulingExecutionVertex> jv2Vertices = 
jobVertexInfos.get(2).f1;
+        List<TestingSchedulingExecutionVertex> jv4Vertices = 
jobVertexInfos.get(4).f1;
+        List<TestingSchedulingExecutionVertex> jv5Vertices = 
jobVertexInfos.get(5).f1;
+        List<TestingSchedulingExecutionVertex> jv6Vertices = 
jobVertexInfos.get(6).f1;
+        // Check vertices of jv1 & jv2
+        for (int i = 0; i < jv1Vertices.size(); i++) {
+            assertThat(getTargetGroup(strategy, jv1Vertices, i))
+                    .isEqualTo(getTargetGroup(strategy, jv2Vertices, i))
+                    .isEqualTo(getTargetGroup(strategy, jv4Vertices, i))
+                    .isEqualTo(getTargetGroup(strategy, jv5Vertices, i));
+        }
+        // Check vertices of jv4 & jv5
+        for (int i = 0; i < jv4Vertices.size(); i++) {
+            assertThat(getTargetGroup(strategy, jv4Vertices, i))
+                    .isEqualTo(getTargetGroup(strategy, jv5Vertices, i));
+        }
+        // Check for tJv4
+        assertThat(getTargetGroup(strategy, jv4Vertices, 2))
+                .isNotEqualTo(getTargetGroup(strategy, jv1Vertices, 0))
+                .isNotEqualTo(getTargetGroup(strategy, jv1Vertices, 1));
+        assertThat(getTargetGroup(strategy, jv4Vertices, 3))
+                .isNotEqualTo(getTargetGroup(strategy, jv1Vertices, 0))
+                .isNotEqualTo(getTargetGroup(strategy, jv1Vertices, 1));
+        // Check for tJv6
+        assertThat(getTargetGroup(strategy, jv6Vertices, 0))
+                .isEqualTo(getTargetGroup(strategy, jv0Vertices, 0));
+        assertThat(getTargetGroup(strategy, jv6Vertices, 1))
+                .isEqualTo(getTargetGroup(strategy, jv4Vertices, 3));
+        assertThat(getTargetGroup(strategy, jv6Vertices, 2))
+                .isEqualTo(getTargetGroup(strategy, jv4Vertices, 0));
+    }
+
+    private ExecutionSlotSharingGroup getTargetGroup(
+            SlotSharingStrategy strategy,
+            List<TestingSchedulingExecutionVertex> jvVertices,
+            int index) {
+        return 
strategy.getExecutionSlotSharingGroup(jvVertices.get(index).getId());
+    }
+}

Reply via email to