This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 76049d09e40a82f1222ed8a2e8279eda183f43f5 Author: Roc Marshal <[email protected]> AuthorDate: Fri Jul 26 10:48:22 2024 +0800 [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler --- .../scheduler/AbstractSlotSharingStrategy.java | 113 ++++++++ .../scheduler/ExecutionSlotSharingGroup.java | 22 +- .../LocalInputPreferredSlotSharingStrategy.java | 90 +----- .../TaskBalancedPreferredSlotSharingStrategy.java | 310 +++++++++++++++++++++ .../scheduler/AbstractSlotSharingStrategyTest.java | 140 ++++++++++ ...LocalInputPreferredSlotSharingStrategyTest.java | 260 ++++++++--------- .../MergingSharedSlotProfileRetrieverTest.java | 7 +- .../runtime/scheduler/SharedSlotTestingUtils.java | 3 +- .../SlotSharingExecutionSlotAllocatorTest.java | 6 +- ...skBalancedPreferredSlotSharingStrategyTest.java | 178 ++++++++++++ 10 files changed, 894 insertions(+), 235 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java new file mode 100644 index 00000000000..d13c62909b3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategy.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** Base abstract implementation for {@link SlotSharingStrategy}. */ +abstract class AbstractSlotSharingStrategy + implements SlotSharingStrategy, SchedulingTopologyListener { + + protected final Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroupMap; + + protected final Set<SlotSharingGroup> logicalSlotSharingGroups; + + protected final Set<CoLocationGroup> coLocationGroups; + + AbstractSlotSharingStrategy( + final SchedulingTopology topology, + final Set<SlotSharingGroup> slotSharingGroups, + final Set<CoLocationGroup> coLocationGroups) { + this.logicalSlotSharingGroups = checkNotNull(slotSharingGroups); + this.coLocationGroups = checkNotNull(coLocationGroups); + + this.executionSlotSharingGroupMap = computeExecutionSlotSharingGroups(topology); + topology.registerSchedulingTopologyListener(this); + } + + @Override + public ExecutionSlotSharingGroup getExecutionSlotSharingGroup( + final ExecutionVertexID executionVertexId) { + return executionSlotSharingGroupMap.get(executionVertexId); + } + + @Override + public Set<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups() { + return new HashSet<>(executionSlotSharingGroupMap.values()); + } + + @Override + public void notifySchedulingTopologyUpdated( + SchedulingTopology schedulingTopology, List<ExecutionVertexID> newExecutionVertices) { + final Map<ExecutionVertexID, ExecutionSlotSharingGroup> newMap = + computeExecutionSlotSharingGroups(schedulingTopology); + + for (ExecutionVertexID vertexId : newMap.keySet()) { + final ExecutionSlotSharingGroup newEssg = newMap.get(vertexId); + final ExecutionSlotSharingGroup oldEssg = executionSlotSharingGroupMap.get(vertexId); + if (oldEssg == null) { + executionSlotSharingGroupMap.put(vertexId, newEssg); + } else { + // ensures that existing slot sharing groups are not changed + checkState( + oldEssg.getExecutionVertexIds().equals(newEssg.getExecutionVertexIds()), + "Existing ExecutionSlotSharingGroups are changed after topology update"); + } + } + } + + /** + * The vertices are topologically sorted since {@link DefaultExecutionTopology#getVertices} are + * topologically sorted. + */ + @Nonnull + static LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> getExecutionVertices( + SchedulingTopology topology) { + final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> vertices = + new LinkedHashMap<>(); + for (SchedulingExecutionVertex executionVertex : topology.getVertices()) { + final List<SchedulingExecutionVertex> executionVertexGroup = + vertices.computeIfAbsent( + executionVertex.getId().getJobVertexId(), k -> new ArrayList<>()); + executionVertexGroup.add(executionVertex); + } + return vertices; + } + + protected abstract Map<ExecutionVertexID, ExecutionSlotSharingGroup> + computeExecutionSlotSharingGroups(SchedulingTopology schedulingTopology); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java index cf513eb477f..317f0ef99ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java @@ -18,10 +18,14 @@ package org.apache.flink.runtime.scheduler; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; + import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -31,9 +35,10 @@ class ExecutionSlotSharingGroup { private final Set<ExecutionVertexID> executionVertexIds; - private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN; + @Nonnull private final SlotSharingGroup slotSharingGroup; - ExecutionSlotSharingGroup() { + ExecutionSlotSharingGroup(@Nonnull SlotSharingGroup slotSharingGroup) { + this.slotSharingGroup = Preconditions.checkNotNull(slotSharingGroup); this.executionVertexIds = new HashSet<>(); } @@ -41,12 +46,15 @@ class ExecutionSlotSharingGroup { executionVertexIds.add(executionVertexId); } - void setResourceProfile(ResourceProfile resourceProfile) { - this.resourceProfile = Preconditions.checkNotNull(resourceProfile); + @VisibleForTesting + @Nonnull + SlotSharingGroup getSlotSharingGroup() { + return slotSharingGroup; } + @Nonnull ResourceProfile getResourceProfile() { - return resourceProfile; + return slotSharingGroup.getResourceProfile(); } Set<ExecutionVertexID> getExecutionVertexIds() { @@ -58,8 +66,8 @@ class ExecutionSlotSharingGroup { return "ExecutionSlotSharingGroup{" + "executionVertexIds=" + executionVertexIds - + ", resourceProfile=" - + resourceProfile + + ", slotSharingGroup=" + + slotSharingGroup + '}'; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java index 99b4f1ca85a..f4c0e854512 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; @@ -31,7 +30,6 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.LinkedHashMap; @@ -42,69 +40,19 @@ import java.util.Objects; import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * This strategy tries to reduce remote data exchanges. Execution vertices, which are connected and * belong to the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. * Co-location constraints will be respected. */ -class LocalInputPreferredSlotSharingStrategy - implements SlotSharingStrategy, SchedulingTopologyListener { - - private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> executionSlotSharingGroupMap; - - private final Set<SlotSharingGroup> logicalSlotSharingGroups; - - private final Set<CoLocationGroup> coLocationGroups; +class LocalInputPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy { LocalInputPreferredSlotSharingStrategy( final SchedulingTopology topology, final Set<SlotSharingGroup> logicalSlotSharingGroups, final Set<CoLocationGroup> coLocationGroups) { - - this.logicalSlotSharingGroups = checkNotNull(logicalSlotSharingGroups); - this.coLocationGroups = checkNotNull(coLocationGroups); - - this.executionSlotSharingGroupMap = - new ExecutionSlotSharingGroupBuilder( - topology, logicalSlotSharingGroups, coLocationGroups) - .build(); - topology.registerSchedulingTopologyListener(this); - } - - @Override - public ExecutionSlotSharingGroup getExecutionSlotSharingGroup( - final ExecutionVertexID executionVertexId) { - return executionSlotSharingGroupMap.get(executionVertexId); - } - - @Override - public Set<ExecutionSlotSharingGroup> getExecutionSlotSharingGroups() { - return new HashSet<>(executionSlotSharingGroupMap.values()); - } - - @Override - public void notifySchedulingTopologyUpdated( - SchedulingTopology schedulingTopology, List<ExecutionVertexID> newExecutionVertices) { - - final Map<ExecutionVertexID, ExecutionSlotSharingGroup> newMap = - new LocalInputPreferredSlotSharingStrategy.ExecutionSlotSharingGroupBuilder( - schedulingTopology, logicalSlotSharingGroups, coLocationGroups) - .build(); - - for (ExecutionVertexID vertexId : newMap.keySet()) { - final ExecutionSlotSharingGroup newEssg = newMap.get(vertexId); - final ExecutionSlotSharingGroup oldEssg = executionSlotSharingGroupMap.get(vertexId); - if (oldEssg == null) { - executionSlotSharingGroupMap.put(vertexId, newEssg); - } else { - // ensures that existing slot sharing groups are not changed - checkState( - oldEssg.getExecutionVertexIds().equals(newEssg.getExecutionVertexIds()), - "Existing ExecutionSlotSharingGroups are changed after topology update"); - } - } + super(topology, logicalSlotSharingGroups, coLocationGroups); } static class Factory implements SlotSharingStrategy.Factory { @@ -119,7 +67,15 @@ class LocalInputPreferredSlotSharingStrategy } } - private static class ExecutionSlotSharingGroupBuilder { + @Override + protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> computeExecutionSlotSharingGroups( + SchedulingTopology schedulingTopology) { + return new LocalInputPreferredExecutionSlotSharingGroupBuilder( + schedulingTopology, logicalSlotSharingGroups, coLocationGroups) + .build(); + } + + private static class LocalInputPreferredExecutionSlotSharingGroupBuilder { private final SchedulingTopology topology; private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap; @@ -170,7 +126,7 @@ class LocalInputPreferredSlotSharingStrategy private final Map<ConsumedPartitionGroup, LinkedHashSet<ExecutionSlotSharingGroup>> candidateGroupsForConsumedPartitionGroup; - private ExecutionSlotSharingGroupBuilder( + private LocalInputPreferredExecutionSlotSharingGroupBuilder( final SchedulingTopology topology, final Set<SlotSharingGroup> logicalSlotSharingGroups, final Set<CoLocationGroup> coLocationGroups) { @@ -212,7 +168,7 @@ class LocalInputPreferredSlotSharingStrategy */ private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() { final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> allVertices = - getExecutionVertices(); + getExecutionVertices(topology); // loop on job vertices so that an execution vertex will not be added into a group // if that group better fits another execution vertex @@ -228,22 +184,6 @@ class LocalInputPreferredSlotSharingStrategy return executionSlotSharingGroupMap; } - /** - * The vertices are topologically sorted since {@link DefaultExecutionTopology#getVertices} - * are topologically sorted. - */ - private LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> getExecutionVertices() { - final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> vertices = - new LinkedHashMap<>(); - for (SchedulingExecutionVertex executionVertex : topology.getVertices()) { - final List<SchedulingExecutionVertex> executionVertexGroup = - vertices.computeIfAbsent( - executionVertex.getId().getJobVertexId(), k -> new ArrayList<>()); - executionVertexGroup.add(executionVertex); - } - return vertices; - } - private List<SchedulingExecutionVertex> tryFindOptimalAvailableExecutionSlotSharingGroupFor( final List<SchedulingExecutionVertex> executionVertices) { @@ -394,8 +334,8 @@ class LocalInputPreferredSlotSharingStrategy final SlotSharingGroup slotSharingGroup = getSlotSharingGroup(executionVertexId.getJobVertexId()); - final ExecutionSlotSharingGroup newGroup = new ExecutionSlotSharingGroup(); - newGroup.setResourceProfile(slotSharingGroup.getResourceProfile()); + final ExecutionSlotSharingGroup newGroup = + new ExecutionSlotSharingGroup(slotSharingGroup); // Once a new ExecutionSlotSharingGroup is created, it's available for all JobVertices // in this SlotSharingGroup diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java new file mode 100644 index 00000000000..93f6e43ba81 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put evenly in each ExecutionSlotSharingGroup. Co-location + * constraints will be respected. + */ +class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy { + + public static final Logger LOG = + LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class); + + TaskBalancedPreferredSlotSharingStrategy( + final SchedulingTopology topology, + final Set<SlotSharingGroup> slotSharingGroups, + final Set<CoLocationGroup> coLocationGroups) { + super(topology, slotSharingGroups, coLocationGroups); + } + + @Override + protected Map<ExecutionVertexID, ExecutionSlotSharingGroup> computeExecutionSlotSharingGroups( + SchedulingTopology schedulingTopology) { + return new TaskBalancedExecutionSlotSharingGroupBuilder( + schedulingTopology, this.logicalSlotSharingGroups, this.coLocationGroups) + .build(); + } + + static class Factory implements SlotSharingStrategy.Factory { + + public TaskBalancedPreferredSlotSharingStrategy create( + final SchedulingTopology topology, + final Set<SlotSharingGroup> slotSharingGroups, + final Set<CoLocationGroup> coLocationGroups) { + + return new TaskBalancedPreferredSlotSharingStrategy( + topology, slotSharingGroups, coLocationGroups); + } + } + + /** SlotSharingGroupBuilder class for balanced scheduling strategy. */ + private static class TaskBalancedExecutionSlotSharingGroupBuilder { + + private final SchedulingTopology topology; + + private final Map<JobVertexID, SlotSharingGroup> slotSharingGroupMap; + + /** Record the {@link ExecutionSlotSharingGroup}s for {@link SlotSharingGroup}s. */ + private final Map<SlotSharingGroup, List<ExecutionSlotSharingGroup>> + paralleledExecutionSlotSharingGroupsMap; + + /** + * Record the next round-robin {@link ExecutionSlotSharingGroup} index for {@link + * SlotSharingGroup}s. + */ + private final Map<SlotSharingGroup, Integer> slotSharingGroupIndexMap; + + private final Map<ExecutionVertexID, ExecutionSlotSharingGroup> + executionSlotSharingGroupMap; + + private final Map<JobVertexID, CoLocationGroup> coLocationGroupMap; + + private final Map<CoLocationConstraint, ExecutionSlotSharingGroup> + constraintToExecutionSlotSharingGroupMap; + + private TaskBalancedExecutionSlotSharingGroupBuilder( + final SchedulingTopology topology, + final Set<SlotSharingGroup> slotSharingGroups, + final Set<CoLocationGroup> coLocationGroups) { + this.topology = checkNotNull(topology); + + this.coLocationGroupMap = new HashMap<>(); + for (CoLocationGroup coLocationGroup : coLocationGroups) { + for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) { + coLocationGroupMap.put(jobVertexId, coLocationGroup); + } + } + + this.constraintToExecutionSlotSharingGroupMap = new HashMap<>(); + this.paralleledExecutionSlotSharingGroupsMap = new HashMap<>(slotSharingGroups.size()); + this.slotSharingGroupIndexMap = new HashMap<>(slotSharingGroups.size()); + this.slotSharingGroupMap = new HashMap<>(); + this.executionSlotSharingGroupMap = new HashMap<>(); + + for (SlotSharingGroup slotSharingGroup : slotSharingGroups) { + for (JobVertexID jobVertexId : slotSharingGroup.getJobVertexIds()) { + slotSharingGroupMap.put(jobVertexId, slotSharingGroup); + } + } + } + + private Map<ExecutionVertexID, ExecutionSlotSharingGroup> build() { + + final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> allVertices = + getExecutionVertices(topology); + + initParalleledExecutionSlotSharingGroupsMap(allVertices); + + // Loop on job vertices + for (Map.Entry<JobVertexID, List<SchedulingExecutionVertex>> executionVertexInfos : + allVertices.entrySet()) { + + JobVertexID jobVertexID = executionVertexInfos.getKey(); + List<SchedulingExecutionVertex> executionVertices = executionVertexInfos.getValue(); + final SlotSharingGroup slotSharingGroup = slotSharingGroupMap.get(jobVertexID); + + if (!coLocationGroupMap.containsKey(jobVertexID)) { + // For vertices without CoLocationConstraint. + allocateNonCoLocatedVertices(slotSharingGroup, executionVertices); + } else { + // For vertices with CoLocationConstraint. + allocateCoLocatedVertices(slotSharingGroup, executionVertices); + } + } + return executionSlotSharingGroupMap; + } + + private void initParalleledExecutionSlotSharingGroupsMap( + final LinkedHashMap<JobVertexID, List<SchedulingExecutionVertex>> allVertices) { + + allVertices.entrySet().stream() + .map( + jobVertexExecutionVertices -> + Tuple2.of( + slotSharingGroupMap.get( + jobVertexExecutionVertices.getKey()), + jobVertexExecutionVertices.getValue().size())) + .collect( + Collectors.groupingBy( + tuple -> tuple.f0, + Collectors.summarizingInt(tuple -> tuple.f1))) + .forEach( + (slotSharingGroup, statistics) -> { + int slotNum = statistics.getMax(); + paralleledExecutionSlotSharingGroupsMap.put( + slotSharingGroup, + createExecutionSlotSharingGroups( + slotSharingGroup, slotNum)); + }); + } + + private List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups( + SlotSharingGroup slotSharingGroup, int slotNum) { + final List<ExecutionSlotSharingGroup> executionSlotSharingGroups = + new ArrayList<>(slotNum); + for (int i = 0; i < slotNum; i++) { + final ExecutionSlotSharingGroup executionSlotSharingGroup = + new ExecutionSlotSharingGroup(slotSharingGroup); + executionSlotSharingGroups.add(i, executionSlotSharingGroup); + LOG.debug( + "Create {}th executionSlotSharingGroup {}.", i, executionSlotSharingGroup); + } + return executionSlotSharingGroups; + } + + private void allocateCoLocatedVertices( + SlotSharingGroup slotSharingGroup, + List<SchedulingExecutionVertex> executionVertices) { + + final List<ExecutionSlotSharingGroup> executionSlotSharingGroups = + paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup); + for (SchedulingExecutionVertex executionVertex : executionVertices) { + final CoLocationConstraint coLocationConstraint = + getCoLocationConstraint(executionVertex); + ExecutionSlotSharingGroup executionSlotSharingGroup = + constraintToExecutionSlotSharingGroupMap.get(coLocationConstraint); + if (Objects.isNull(executionSlotSharingGroup)) { + executionSlotSharingGroup = + executionSlotSharingGroups.get( + getLeastUtilizeSlotIndex( + executionSlotSharingGroups, executionVertex)); + constraintToExecutionSlotSharingGroupMap.put( + coLocationConstraint, executionSlotSharingGroup); + } + addVertexToExecutionSlotSharingGroup(executionSlotSharingGroup, executionVertex); + } + final int jobVertexParallel = executionVertices.size(); + if (!isMaxParallelism(jobVertexParallel, slotSharingGroup)) { + int index = getLeastUtilizeSlotIndex(executionSlotSharingGroups, null); + updateSlotRoundRobinIndexIfNeeded(jobVertexParallel, slotSharingGroup, index); + } + } + + private void allocateNonCoLocatedVertices( + SlotSharingGroup slotSharingGroup, + List<SchedulingExecutionVertex> executionVertices) { + final int jobVertexParallel = executionVertices.size(); + int index = getSlotRoundRobinIndex(jobVertexParallel, slotSharingGroup); + final List<ExecutionSlotSharingGroup> executionSlotSharingGroups = + paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup); + for (SchedulingExecutionVertex executionVertex : executionVertices) { + addVertexToExecutionSlotSharingGroup( + executionSlotSharingGroups.get(index), executionVertex); + index = ++index % executionSlotSharingGroups.size(); + } + updateSlotRoundRobinIndexIfNeeded(executionVertices.size(), slotSharingGroup, index); + } + + private void addVertexToExecutionSlotSharingGroup( + ExecutionSlotSharingGroup executionSlotSharingGroup, + SchedulingExecutionVertex executionVertex) { + final ExecutionVertexID executionVertexId = executionVertex.getId(); + executionSlotSharingGroup.addVertex(executionVertexId); + executionSlotSharingGroupMap.put(executionVertexId, executionSlotSharingGroup); + } + + private CoLocationConstraint getCoLocationConstraint(SchedulingExecutionVertex sev) { + final JobVertexID jobVertexID = sev.getId().getJobVertexId(); + final int subtaskIndex = sev.getId().getSubtaskIndex(); + return coLocationGroupMap.get(jobVertexID).getLocationConstraint(subtaskIndex); + } + + private int getSlotRoundRobinIndex( + final int jobVertexParallelism, SlotSharingGroup slotSharingGroup) { + final boolean maxParallel = isMaxParallelism(jobVertexParallelism, slotSharingGroup); + return maxParallel ? 0 : slotSharingGroupIndexMap.getOrDefault(slotSharingGroup, 0); + } + + private void updateSlotRoundRobinIndexIfNeeded( + final int jobVertexParallelism, + final SlotSharingGroup slotSharingGroup, + final int nextIndex) { + if (!isMaxParallelism(jobVertexParallelism, slotSharingGroup)) { + slotSharingGroupIndexMap.put(slotSharingGroup, nextIndex); + } + } + + private boolean isMaxParallelism( + final int jobVertexParallelism, final SlotSharingGroup slotSharingGroup) { + final List<ExecutionSlotSharingGroup> executionSlotSharingGroups = + paralleledExecutionSlotSharingGroupsMap.get(slotSharingGroup); + return jobVertexParallelism == executionSlotSharingGroups.size(); + } + + private int getLeastUtilizeSlotIndex( + final List<ExecutionSlotSharingGroup> executionSlotSharingGroups, + @Nullable final SchedulingExecutionVertex executionVertex) { + int indexWithLeastExecutionVertices = 0; + int leastExecutionVertices = Integer.MAX_VALUE; + for (int index = 0; index < executionSlotSharingGroups.size(); index++) { + final ExecutionSlotSharingGroup executionSlotSharingGroup = + executionSlotSharingGroups.get(index); + final int executionVertices = + executionSlotSharingGroup.getExecutionVertexIds().size(); + if (leastExecutionVertices > executionVertices + && (Objects.isNull(executionVertex) + || allocatable(executionSlotSharingGroup, executionVertex))) { + indexWithLeastExecutionVertices = index; + leastExecutionVertices = executionVertices; + } + } + return indexWithLeastExecutionVertices; + } + + private boolean allocatable( + final ExecutionSlotSharingGroup executionSlotSharingGroup, + @Nonnull SchedulingExecutionVertex executionVertex) { + final ExecutionVertexID executionVertexId = executionVertex.getId(); + final JobVertexID jobVertexId = executionVertexId.getJobVertexId(); + final Set<JobVertexID> allocatedJobVertices = + executionSlotSharingGroup.getExecutionVertexIds().stream() + .map(ExecutionVertexID::getJobVertexId) + .collect(Collectors.toSet()); + return !allocatedJobVertices.contains(jobVertexId); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategyTest.java new file mode 100644 index 00000000000..c0fc8ba94e2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/AbstractSlotSharingStrategyTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava32.com.google.common.collect.Sets; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Abstract test class base for {@link SlotSharingStrategy}. */ +abstract class AbstractSlotSharingStrategyTest { + + protected TestingSchedulingTopology topology; + + protected JobVertexID jobVertexId1; + protected JobVertexID jobVertexId2; + + protected SlotSharingGroup slotSharingGroup; + protected SlotSharingGroup slotSharingGroup1; + protected SlotSharingGroup slotSharingGroup2; + + @BeforeEach + protected void setup() { + this.topology = new TestingSchedulingTopology(); + this.jobVertexId1 = new JobVertexID(); + this.jobVertexId2 = new JobVertexID(); + this.slotSharingGroup = new SlotSharingGroup(); + this.slotSharingGroup1 = new SlotSharingGroup(); + this.slotSharingGroup2 = new SlotSharingGroup(); + } + + @Test + void testSetSlotSharingGroupResource() { + final TestingSchedulingExecutionVertex ev10 = topology.newExecutionVertex(jobVertexId1, 0); + final TestingSchedulingExecutionVertex ev11 = topology.newExecutionVertex(jobVertexId1, 1); + final TestingSchedulingExecutionVertex ev20 = topology.newExecutionVertex(jobVertexId2, 0); + + final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10); + slotSharingGroup1.addVertexToGroup(jobVertexId1); + slotSharingGroup1.setResourceProfile(resourceProfile1); + + final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20); + slotSharingGroup2.addVertexToGroup(jobVertexId2); + slotSharingGroup2.setResourceProfile(resourceProfile2); + + final SlotSharingStrategy strategy = + getSlotSharingStrategy( + topology, + Sets.newHashSet(slotSharingGroup1, slotSharingGroup2), + Sets.newHashSet()); + + assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(3); + assertThat(strategy.getExecutionSlotSharingGroup(ev10.getId()).getResourceProfile()) + .isEqualTo(resourceProfile1); + assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getResourceProfile()) + .isEqualTo(resourceProfile1); + assertThat(strategy.getExecutionSlotSharingGroup(ev20.getId()).getResourceProfile()) + .isEqualTo(resourceProfile2); + } + + protected abstract SlotSharingStrategy getSlotSharingStrategy( + final SchedulingTopology topology, + final Set<SlotSharingGroup> slotSharingGroups, + final Set<CoLocationGroup> coLocationGroups); + + protected void renderTopology( + TestingSchedulingTopology topology, + List<TestingJobVertexInfo> mockedJobVertices, + List<Tuple2<JobVertexID, List<TestingSchedulingExecutionVertex>>> jobVertexInfos) { + for (TestingJobVertexInfo testingJobVertexInfo : mockedJobVertices) { + List<TestingSchedulingExecutionVertex> tSEVs = new ArrayList<>(); + for (int subIndex = 0; subIndex < testingJobVertexInfo.parallelism; subIndex++) { + tSEVs.add( + topology.newExecutionVertex( + testingJobVertexInfo.jobVertex.getID(), subIndex)); + } + jobVertexInfos.add(Tuple2.of(testingJobVertexInfo.jobVertex.getID(), tSEVs)); + } + } + + /** Util class to represent the simple job vertex information. */ + protected static class TestingJobVertexInfo { + final JobVertex jobVertex = new JobVertex(null, new JobVertexID()); + @Nonnull SlotSharingGroup slotSharingGroup; + @Nullable CoLocationGroup coLocationGroup; + int parallelism; + + public TestingJobVertexInfo( + int parallelism, + @Nonnull SlotSharingGroup slotSharingGroup, + @Nullable CoLocationGroup coLocationGroup) { + Preconditions.checkArgument(parallelism > 0); + this.parallelism = parallelism; + this.slotSharingGroup = slotSharingGroup; + this.coLocationGroup = coLocationGroup; + + this.slotSharingGroup.addVertexToGroup(jobVertex.getID()); + if (this.coLocationGroup != null) { + ((CoLocationGroupImpl) this.coLocationGroup).addVertex(jobVertex); + } + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java index dcdaa96c947..829c4865a81 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.scheduler; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder; @@ -33,15 +33,17 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex; -import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; -import org.junit.jupiter.api.BeforeEach; +import org.apache.flink.shaded.guava32.com.google.common.collect.Lists; +import org.apache.flink.shaded.guava32.com.google.common.collect.Sets; + import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -53,83 +55,41 @@ import java.util.concurrent.ScheduledExecutorService; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link LocalInputPreferredSlotSharingStrategy}. */ -class LocalInputPreferredSlotSharingStrategyTest { +class LocalInputPreferredSlotSharingStrategyTest extends AbstractSlotSharingStrategyTest { @RegisterExtension private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension(); - private TestingSchedulingTopology topology; - - private static final JobVertexID JOB_VERTEX_ID_1 = new JobVertexID(); - private static final JobVertexID JOB_VERTEX_ID_2 = new JobVertexID(); + private final JobVertexID jobVertexId3 = new JobVertexID(); private TestingSchedulingExecutionVertex ev11; private TestingSchedulingExecutionVertex ev12; private TestingSchedulingExecutionVertex ev21; private TestingSchedulingExecutionVertex ev22; - - private Set<SlotSharingGroup> slotSharingGroups; - - @BeforeEach - void setUp() { - topology = new TestingSchedulingTopology(); - - ev11 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 0); - ev12 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 1); - - ev21 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 0); - ev22 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 1); - - final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); - slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1); - slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_2); - slotSharingGroups = Collections.singleton(slotSharingGroup); - } - - @Test - void testCoLocationConstraintIsRespected() { - topology.connect(ev11, ev22); - topology.connect(ev12, ev21); - - final CoLocationGroup coLocationGroup = - new TestingCoLocationGroup(JOB_VERTEX_ID_1, JOB_VERTEX_ID_2); - final Set<CoLocationGroup> coLocationGroups = Collections.singleton(coLocationGroup); - - final SlotSharingStrategy strategy = - new LocalInputPreferredSlotSharingStrategy( - topology, slotSharingGroups, coLocationGroups); - - assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2); - assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds()) - .contains(ev11.getId(), ev21.getId()); - assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getExecutionVertexIds()) - .contains(ev12.getId(), ev22.getId()); + private TestingSchedulingExecutionVertex ev23; + + @Override + protected SlotSharingStrategy getSlotSharingStrategy( + SchedulingTopology topology, + Set<SlotSharingGroup> slotSharingGroups, + Set<CoLocationGroup> coLocationGroups) { + return new LocalInputPreferredSlotSharingStrategy( + topology, slotSharingGroups, coLocationGroups); } @Test void testInputLocalityIsRespectedWithRescaleEdge() { - final TestingSchedulingTopology topology = new TestingSchedulingTopology(); - - final TestingSchedulingExecutionVertex ev11 = - topology.newExecutionVertex(JOB_VERTEX_ID_1, 0); - final TestingSchedulingExecutionVertex ev12 = - topology.newExecutionVertex(JOB_VERTEX_ID_1, 1); - - final TestingSchedulingExecutionVertex ev21 = - topology.newExecutionVertex(JOB_VERTEX_ID_2, 0); - final TestingSchedulingExecutionVertex ev22 = - topology.newExecutionVertex(JOB_VERTEX_ID_2, 1); - final TestingSchedulingExecutionVertex ev23 = - topology.newExecutionVertex(JOB_VERTEX_ID_2, 2); + createTwoExeVerticesPerJv1AndJv2(slotSharingGroup); + ev23 = topology.newExecutionVertex(jobVertexId2, 2); topology.connect(ev11, ev21); topology.connect(ev11, ev22); topology.connect(ev12, ev23); final SlotSharingStrategy strategy = - new LocalInputPreferredSlotSharingStrategy( - topology, slotSharingGroups, Collections.emptySet()); + getSlotSharingStrategy( + topology, Sets.newHashSet(slotSharingGroup), Collections.emptySet()); assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(3); assertThat(strategy.getExecutionSlotSharingGroup(ev21.getId()).getExecutionVertexIds()) @@ -140,19 +100,30 @@ class LocalInputPreferredSlotSharingStrategyTest { .contains(ev12.getId(), ev23.getId()); } + private void createTwoExeVerticesPerJv1AndJv2(SlotSharingGroup sharingGroup) { + ev11 = topology.newExecutionVertex(jobVertexId1, 0); + ev12 = topology.newExecutionVertex(jobVertexId1, 1); + + ev21 = topology.newExecutionVertex(jobVertexId2, 0); + ev22 = topology.newExecutionVertex(jobVertexId2, 1); + sharingGroup.addVertexToGroup(jobVertexId1); + sharingGroup.addVertexToGroup(jobVertexId2); + } + @Test void testInputLocalityIsRespectedWithAllToAllEdge() { - final TestingSchedulingTopology topology = new TestingSchedulingTopology(); + slotSharingGroup.addVertexToGroup(jobVertexId1); + slotSharingGroup.addVertexToGroup(jobVertexId2); final List<TestingSchedulingExecutionVertex> producer = topology.addExecutionVertices() .withParallelism(2) - .withJobVertexID(JOB_VERTEX_ID_1) + .withJobVertexID(jobVertexId1) .finish(); final List<TestingSchedulingExecutionVertex> consumer = topology.addExecutionVertices() .withParallelism(2) - .withJobVertexID(JOB_VERTEX_ID_2) + .withJobVertexID(jobVertexId2) .finish(); topology.connectAllToAll(producer, consumer) @@ -166,8 +137,8 @@ class LocalInputPreferredSlotSharingStrategyTest { ev22 = consumer.get(1); final SlotSharingStrategy strategy = - new LocalInputPreferredSlotSharingStrategy( - topology, slotSharingGroups, Collections.emptySet()); + getSlotSharingStrategy( + topology, Sets.newHashSet(slotSharingGroup), Collections.emptySet()); assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2); assertThat(strategy.getExecutionSlotSharingGroup(ev21.getId()).getExecutionVertexIds()) .contains(ev11.getId(), ev21.getId()); @@ -175,11 +146,56 @@ class LocalInputPreferredSlotSharingStrategyTest { .contains(ev12.getId(), ev22.getId()); } + @Test + void testCoLocationConstraintIsRespected() { + List<Tuple2<JobVertexID, List<TestingSchedulingExecutionVertex>>> jobVertexInfos = + new ArrayList<>(); + CoLocationGroup coLocationGroup1 = new CoLocationGroupImpl(); + CoLocationGroup coLocationGroup2 = new CoLocationGroupImpl(); + List<TestingJobVertexInfo> mockedJobVertices = + Lists.newArrayList( + new TestingJobVertexInfo(1, slotSharingGroup, null), + new TestingJobVertexInfo(2, slotSharingGroup, coLocationGroup1), + new TestingJobVertexInfo(2, slotSharingGroup, coLocationGroup1), + new TestingJobVertexInfo(3, slotSharingGroup, coLocationGroup2), + new TestingJobVertexInfo(3, slotSharingGroup, coLocationGroup2)); + renderTopology(topology, mockedJobVertices, jobVertexInfos); + + final SlotSharingStrategy strategy = + getSlotSharingStrategy( + topology, + Sets.newHashSet(slotSharingGroup), + Sets.newHashSet(coLocationGroup1, coLocationGroup2)); + List<TestingSchedulingExecutionVertex> executionVertices1 = jobVertexInfos.get(1).f1; + List<TestingSchedulingExecutionVertex> executionVertices2 = jobVertexInfos.get(2).f1; + + assertThat(executionVertices1).hasSameSizeAs(executionVertices2); + for (int i = 0; i < executionVertices1.size(); i++) { + ExecutionSlotSharingGroup executionSlotSharingGroup = + strategy.getExecutionSlotSharingGroup(executionVertices1.get(i).getId()); + assertThat(executionSlotSharingGroup) + .isEqualTo( + strategy.getExecutionSlotSharingGroup( + executionVertices2.get(i).getId())); + } + + List<TestingSchedulingExecutionVertex> executionVertices3 = jobVertexInfos.get(3).f1; + List<TestingSchedulingExecutionVertex> executionVertices4 = jobVertexInfos.get(4).f1; + assertThat(executionVertices3).hasSameSizeAs(executionVertices4); + for (int i = 0; i < executionVertices3.size(); i++) { + assertThat(strategy.getExecutionSlotSharingGroup(executionVertices3.get(i).getId())) + .isEqualTo( + strategy.getExecutionSlotSharingGroup( + executionVertices4.get(i).getId())); + } + } + @Test void testDisjointVerticesInOneGroup() { + createTwoExeVerticesPerJv1AndJv2(slotSharingGroup); final SlotSharingStrategy strategy = - new LocalInputPreferredSlotSharingStrategy( - topology, slotSharingGroups, Collections.emptySet()); + getSlotSharingStrategy( + topology, Sets.newHashSet(slotSharingGroup), Collections.emptySet()); assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2); assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds()) @@ -190,18 +206,20 @@ class LocalInputPreferredSlotSharingStrategyTest { @Test void testVerticesInDifferentSlotSharingGroups() { - final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup(); - slotSharingGroup1.addVertexToGroup(JOB_VERTEX_ID_1); - final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup(); - slotSharingGroup2.addVertexToGroup(JOB_VERTEX_ID_2); - final Set<SlotSharingGroup> slotSharingGroups = new HashSet<>(); - slotSharingGroups.add(slotSharingGroup1); - slotSharingGroups.add(slotSharingGroup2); + ev11 = topology.newExecutionVertex(jobVertexId1, 0); + ev12 = topology.newExecutionVertex(jobVertexId1, 1); + ev21 = topology.newExecutionVertex(jobVertexId2, 0); + ev22 = topology.newExecutionVertex(jobVertexId2, 1); + + slotSharingGroup1.addVertexToGroup(jobVertexId1); + slotSharingGroup2.addVertexToGroup(jobVertexId2); final SlotSharingStrategy strategy = - new LocalInputPreferredSlotSharingStrategy( - topology, slotSharingGroups, Collections.emptySet()); + getSlotSharingStrategy( + topology, + Sets.newHashSet(slotSharingGroup1, slotSharingGroup2), + Collections.emptySet()); assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(4); assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds()) @@ -214,46 +232,17 @@ class LocalInputPreferredSlotSharingStrategyTest { .contains(ev22.getId()); } - @Test - void testSetSlotSharingGroupResource() { - final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup(); - final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10); - slotSharingGroup1.addVertexToGroup(JOB_VERTEX_ID_1); - slotSharingGroup1.setResourceProfile(resourceProfile1); - final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup(); - final ResourceProfile resourceProfile2 = ResourceProfile.fromResources(2, 20); - slotSharingGroup2.addVertexToGroup(JOB_VERTEX_ID_2); - slotSharingGroup2.setResourceProfile(resourceProfile2); - - final Set<SlotSharingGroup> slotSharingGroups = new HashSet<>(); - slotSharingGroups.add(slotSharingGroup1); - slotSharingGroups.add(slotSharingGroup2); - - final SlotSharingStrategy strategy = - new LocalInputPreferredSlotSharingStrategy( - topology, slotSharingGroups, Collections.emptySet()); - - assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(4); - assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getResourceProfile()) - .isEqualTo(resourceProfile1); - assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getResourceProfile()) - .isEqualTo(resourceProfile1); - assertThat(strategy.getExecutionSlotSharingGroup(ev21.getId()).getResourceProfile()) - .isEqualTo(resourceProfile2); - assertThat(strategy.getExecutionSlotSharingGroup(ev22.getId()).getResourceProfile()) - .isEqualTo(resourceProfile2); - } - /** * In this test case, there are two JobEdges between two JobVertices. There will be no * ExecutionSlotSharingGroup that contains two vertices with the same JobVertexID. */ @Test void testInputLocalityIsRespectedWithTwoEdgesBetweenTwoVertices() throws Exception { + createTwoExeVerticesPerJv1AndJv2(slotSharingGroup); int parallelism = 4; - JobVertex v1 = createJobVertex("v1", JOB_VERTEX_ID_1, parallelism); - JobVertex v2 = createJobVertex("v2", JOB_VERTEX_ID_2, parallelism); + JobVertex v1 = createJobVertex("v1", jobVertexId1, parallelism); + JobVertex v2 = createJobVertex("v2", jobVertexId2, parallelism); v2.connectNewDataSetAsInput( v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); @@ -271,17 +260,15 @@ class LocalInputPreferredSlotSharingStrategyTest { final SchedulingTopology topology = executionGraph.getSchedulingTopology(); final SlotSharingStrategy strategy = - new LocalInputPreferredSlotSharingStrategy( - topology, slotSharingGroups, Collections.emptySet()); + getSlotSharingStrategy( + topology, Sets.newHashSet(slotSharingGroup), Collections.emptySet()); assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(4); ExecutionVertex[] ev1 = - Objects.requireNonNull(executionGraph.getJobVertex(JOB_VERTEX_ID_1)) - .getTaskVertices(); + Objects.requireNonNull(executionGraph.getJobVertex(jobVertexId1)).getTaskVertices(); ExecutionVertex[] ev2 = - Objects.requireNonNull(executionGraph.getJobVertex(JOB_VERTEX_ID_2)) - .getTaskVertices(); + Objects.requireNonNull(executionGraph.getJobVertex(jobVertexId2)).getTaskVertices(); for (int i = 0; i < parallelism; i++) { assertThat( strategy.getExecutionSlotSharingGroup(ev1[i].getID()) @@ -292,29 +279,20 @@ class LocalInputPreferredSlotSharingStrategyTest { @Test void testGetExecutionSlotSharingGroupOfLateAttachedVertices() { + slotSharingGroup1.addVertexToGroup(jobVertexId1); + slotSharingGroup1.addVertexToGroup(jobVertexId2); + slotSharingGroup2.addVertexToGroup(jobVertexId3); - JobVertexID jobVertexID1 = new JobVertexID(); - JobVertexID jobVertexID2 = new JobVertexID(); - JobVertexID jobVertexID3 = new JobVertexID(); - - final SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup(); - slotSharingGroup1.addVertexToGroup(jobVertexID1); - slotSharingGroup1.addVertexToGroup(jobVertexID2); - - final SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup(); - slotSharingGroup2.addVertexToGroup(jobVertexID3); - - TestingSchedulingTopology topology = new TestingSchedulingTopology(); - - TestingSchedulingExecutionVertex ev1 = topology.newExecutionVertex(jobVertexID1, 0); - TestingSchedulingExecutionVertex ev2 = topology.newExecutionVertex(jobVertexID2, 0); + TestingSchedulingExecutionVertex ev1 = topology.newExecutionVertex(jobVertexId1, 0); + TestingSchedulingExecutionVertex ev2 = topology.newExecutionVertex(jobVertexId2, 0); topology.connect(ev1, ev2); final LocalInputPreferredSlotSharingStrategy strategy = - new LocalInputPreferredSlotSharingStrategy( - topology, - new HashSet<>(Arrays.asList(slotSharingGroup1, slotSharingGroup2)), - Collections.emptySet()); + (LocalInputPreferredSlotSharingStrategy) + getSlotSharingStrategy( + topology, + new HashSet<>(Arrays.asList(slotSharingGroup1, slotSharingGroup2)), + Collections.emptySet()); assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(1); assertThat(strategy.getExecutionSlotSharingGroup(ev1.getId()).getExecutionVertexIds()) @@ -323,7 +301,7 @@ class LocalInputPreferredSlotSharingStrategyTest { .contains(ev1.getId(), ev2.getId()); // add new job vertices and notify scheduling topology updated - TestingSchedulingExecutionVertex ev3 = topology.newExecutionVertex(jobVertexID3, 0); + TestingSchedulingExecutionVertex ev3 = topology.newExecutionVertex(jobVertexId3, 0); topology.connect(ev2, ev3, ResultPartitionType.BLOCKING); strategy.notifySchedulingTopologyUpdated(topology, Collections.singletonList(ev3.getId())); @@ -343,18 +321,4 @@ class LocalInputPreferredSlotSharingStrategyTest { jobVertex.setInvokableClass(NoOpInvokable.class); return jobVertex; } - - private static class TestingCoLocationGroup extends CoLocationGroupImpl { - - private final List<JobVertexID> vertexIDs; - - private TestingCoLocationGroup(JobVertexID... vertexIDs) { - this.vertexIDs = Arrays.asList(vertexIDs); - } - - @Override - public List<JobVertexID> getVertexIds() { - return vertexIDs; - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java index 26d96ba189f..68d4361c4f6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.FlinkRuntimeException; @@ -64,7 +65,8 @@ class MergingSharedSlotProfileRetrieverTest { SlotProfile slotProfile = sharedSlotProfileRetriever.getSlotProfile( - new ExecutionSlotSharingGroup(), ResourceProfile.ZERO); + new ExecutionSlotSharingGroup(new SlotSharingGroup()), + ResourceProfile.ZERO); assertThat(slotProfile.getTaskResourceProfile()).isEqualTo(ResourceProfile.ZERO); assertThat(slotProfile.getPhysicalSlotResourceProfile()).isEqualTo(ResourceProfile.ZERO); @@ -197,7 +199,8 @@ class MergingSharedSlotProfileRetrieverTest { () -> new HashSet<>(reservedAllocationIds)) .createFromBulk(new HashSet<>(executions)); - ExecutionSlotSharingGroup executionSlotSharingGroup = new ExecutionSlotSharingGroup(); + ExecutionSlotSharingGroup executionSlotSharingGroup = + new ExecutionSlotSharingGroup(new SlotSharingGroup()); executions.stream() .limit(executionSlotSharingGroupSize) .forEach(executionSlotSharingGroup::addVertex); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTestingUtils.java index 39a1580221e..c3126b17c51 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTestingUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.scheduler; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; class SharedSlotTestingUtils { @@ -25,7 +26,7 @@ class SharedSlotTestingUtils { static ExecutionSlotSharingGroup createExecutionSlotSharingGroup( ExecutionVertexID... executions) { - ExecutionSlotSharingGroup group = new ExecutionSlotSharingGroup(); + ExecutionSlotSharingGroup group = new ExecutionSlotSharingGroup(new SlotSharingGroup()); for (ExecutionVertexID execution : executions) { group.addVertex(execution); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java index edab30c0afc..a73ab7ed859 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.clusterframework.types.SlotProfileTestingUtils; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.TestingPayload; @@ -642,9 +643,10 @@ class SlotSharingExecutionSlotAllocatorTest { new HashMap<>(); for (Map.Entry<ExecutionVertexID[], ResourceProfile> groupAndResource : groupAndResources.entrySet()) { + SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); + slotSharingGroup.setResourceProfile(groupAndResource.getValue()); ExecutionSlotSharingGroup executionSlotSharingGroup = - new ExecutionSlotSharingGroup(); - executionSlotSharingGroup.setResourceProfile(groupAndResource.getValue()); + new ExecutionSlotSharingGroup(slotSharingGroup); for (ExecutionVertexID executionVertexId : groupAndResource.getKey()) { executionSlotSharingGroup.addVertex(executionVertexId); executionSlotSharingGroups.put(executionVertexId, executionSlotSharingGroup); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java new file mode 100644 index 00000000000..c975f238e6e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex; + +import org.apache.flink.shaded.guava32.com.google.common.collect.Lists; +import org.apache.flink.shaded.guava32.com.google.common.collect.Sets; + +import org.assertj.core.data.Offset; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */ +class TaskBalancedPreferredSlotSharingStrategyTest extends AbstractSlotSharingStrategyTest { + + @Override + protected SlotSharingStrategy getSlotSharingStrategy( + SchedulingTopology topology, + Set<SlotSharingGroup> slotSharingGroups, + Set<CoLocationGroup> coLocationGroups) { + return new TaskBalancedPreferredSlotSharingStrategy( + topology, slotSharingGroups, coLocationGroups); + } + + @Test + void testVerticesInDifferentSlotSharingGroups() { + List<Tuple2<JobVertexID, List<TestingSchedulingExecutionVertex>>> jobVertexInfos = + new ArrayList<>(); + List<TestingJobVertexInfo> testingJobVertexInfos = + Lists.newArrayList( + new TestingJobVertexInfo(1, slotSharingGroup1, null), + new TestingJobVertexInfo(2, slotSharingGroup1, null), + new TestingJobVertexInfo(3, slotSharingGroup1, null), + new TestingJobVertexInfo(1, slotSharingGroup2, null), + new TestingJobVertexInfo(2, slotSharingGroup2, null), + new TestingJobVertexInfo(2, slotSharingGroup2, null)); + + renderTopology(topology, testingJobVertexInfos, jobVertexInfos); + final SlotSharingStrategy strategy = + getSlotSharingStrategy( + topology, + Sets.newHashSet(slotSharingGroup1, slotSharingGroup2), + Sets.newHashSet()); + assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(5); + checkBalanceAtSlotsLevelWithoutCoLocation(strategy); + + List<TestingSchedulingExecutionVertex> executionVertices4 = jobVertexInfos.get(4).f1; + List<TestingSchedulingExecutionVertex> executionVertices5 = jobVertexInfos.get(5).f1; + assertThat(executionVertices4).hasSameSizeAs(executionVertices5); + // Check for JVs whose parallelism is the max in the same slot sharing group. + for (int i = 0; i < executionVertices4.size(); i++) { + TestingSchedulingExecutionVertex executionVertex4 = executionVertices4.get(i); + assertThat(strategy.getExecutionSlotSharingGroup(executionVertex4.getId())) + .isEqualTo( + strategy.getExecutionSlotSharingGroup( + executionVertices5.get(i).getId())); + } + } + + private void checkBalanceAtSlotsLevelWithoutCoLocation(SlotSharingStrategy strategy) { + strategy.getExecutionSlotSharingGroups().stream() + .collect(Collectors.groupingBy(ExecutionSlotSharingGroup::getSlotSharingGroup)) + .forEach( + (slotSharingGroup, executionSlotSharingGroups) -> { + Optional<Integer> max = + executionSlotSharingGroups.stream() + .map( + executionSlotSharingGroup -> + executionSlotSharingGroup + .getExecutionVertexIds() + .size()) + .max(Comparator.comparing(i -> i)); + Optional<Integer> min = + executionSlotSharingGroups.stream() + .map( + executionSlotSharingGroup -> + executionSlotSharingGroup + .getExecutionVertexIds() + .size()) + .min(Comparator.comparing(i -> i)); + assertThat(max.get()).isCloseTo(min.get(), Offset.offset(1)); + }); + } + + @Test + void testCoLocationConstraintIsRespected() { + List<Tuple2<JobVertexID, List<TestingSchedulingExecutionVertex>>> jobVertexInfos = + new ArrayList<>(); + CoLocationGroup coLocationGroup1 = new CoLocationGroupImpl(); + CoLocationGroup coLocationGroup2 = new CoLocationGroupImpl(); + TestingJobVertexInfo tJv0 = new TestingJobVertexInfo(1, slotSharingGroup, null); + TestingJobVertexInfo tJv1 = new TestingJobVertexInfo(2, slotSharingGroup, coLocationGroup1); + TestingJobVertexInfo tJv2 = new TestingJobVertexInfo(2, slotSharingGroup, coLocationGroup1); + TestingJobVertexInfo tJv3 = new TestingJobVertexInfo(1, slotSharingGroup, null); + TestingJobVertexInfo tJv4 = new TestingJobVertexInfo(4, slotSharingGroup, coLocationGroup1); + TestingJobVertexInfo tJv5 = new TestingJobVertexInfo(4, slotSharingGroup, coLocationGroup1); + TestingJobVertexInfo tJv6 = new TestingJobVertexInfo(3, slotSharingGroup, coLocationGroup2); + List<TestingJobVertexInfo> mockedJobVertices = + Lists.newArrayList(tJv0, tJv1, tJv2, tJv3, tJv4, tJv5, tJv6); + renderTopology(topology, mockedJobVertices, jobVertexInfos); + SlotSharingStrategy strategy = + getSlotSharingStrategy( + topology, + Sets.newHashSet(slotSharingGroup), + Sets.newHashSet(coLocationGroup1, coLocationGroup2)); + + List<TestingSchedulingExecutionVertex> jv0Vertices = jobVertexInfos.get(0).f1; + List<TestingSchedulingExecutionVertex> jv1Vertices = jobVertexInfos.get(1).f1; + List<TestingSchedulingExecutionVertex> jv2Vertices = jobVertexInfos.get(2).f1; + List<TestingSchedulingExecutionVertex> jv4Vertices = jobVertexInfos.get(4).f1; + List<TestingSchedulingExecutionVertex> jv5Vertices = jobVertexInfos.get(5).f1; + List<TestingSchedulingExecutionVertex> jv6Vertices = jobVertexInfos.get(6).f1; + // Check vertices of jv1 & jv2 + for (int i = 0; i < jv1Vertices.size(); i++) { + assertThat(getTargetGroup(strategy, jv1Vertices, i)) + .isEqualTo(getTargetGroup(strategy, jv2Vertices, i)) + .isEqualTo(getTargetGroup(strategy, jv4Vertices, i)) + .isEqualTo(getTargetGroup(strategy, jv5Vertices, i)); + } + // Check vertices of jv4 & jv5 + for (int i = 0; i < jv4Vertices.size(); i++) { + assertThat(getTargetGroup(strategy, jv4Vertices, i)) + .isEqualTo(getTargetGroup(strategy, jv5Vertices, i)); + } + // Check for tJv4 + assertThat(getTargetGroup(strategy, jv4Vertices, 2)) + .isNotEqualTo(getTargetGroup(strategy, jv1Vertices, 0)) + .isNotEqualTo(getTargetGroup(strategy, jv1Vertices, 1)); + assertThat(getTargetGroup(strategy, jv4Vertices, 3)) + .isNotEqualTo(getTargetGroup(strategy, jv1Vertices, 0)) + .isNotEqualTo(getTargetGroup(strategy, jv1Vertices, 1)); + // Check for tJv6 + assertThat(getTargetGroup(strategy, jv6Vertices, 0)) + .isEqualTo(getTargetGroup(strategy, jv0Vertices, 0)); + assertThat(getTargetGroup(strategy, jv6Vertices, 1)) + .isEqualTo(getTargetGroup(strategy, jv4Vertices, 3)); + assertThat(getTargetGroup(strategy, jv6Vertices, 2)) + .isEqualTo(getTargetGroup(strategy, jv4Vertices, 0)); + } + + private ExecutionSlotSharingGroup getTargetGroup( + SlotSharingStrategy strategy, + List<TestingSchedulingExecutionVertex> jvVertices, + int index) { + return strategy.getExecutionSlotSharingGroup(jvVertices.get(index).getId()); + } +}
