This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e38a6709b57000e38bf044d0e55da3dd3ec3bde8 Author: David Moravek <[email protected]> AuthorDate: Fri Jan 21 10:09:53 2022 +0100 [FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler --- .../scheduler/adaptive/AdaptiveScheduler.java | 16 +- .../scheduler/adaptive/JobGraphJobInformation.java | 5 + .../adaptive/allocator/DefaultSlotAssigner.java | 77 ++++++++ .../allocator/JobAllocationsInformation.java | 112 ++++++++++++ .../adaptive/allocator/JobInformation.java | 2 + .../adaptive/allocator/SlotAllocator.java | 4 +- .../{JobInformation.java => SlotAssigner.java} | 36 ++-- .../allocator/SlotSharingSlotAllocator.java | 70 +++---- .../allocator/StateLocalitySlotAssigner.java | 201 +++++++++++++++++++++ .../allocator/SlotSharingSlotAllocatorTest.java | 161 +++++++++-------- .../allocator/StateLocalitySlotAssignerTest.java | 166 +++++++++++++++++ .../adaptive/allocator/TestJobInformation.java | 59 ++++++ .../scheduler/adaptive/allocator/TestSlotInfo.java | 10 +- .../adaptive/allocator/TestVertexInformation.java} | 49 ++--- .../adaptive/allocator/TestingSlotAllocator.java | 31 +--- 15 files changed, 802 insertions(+), 197 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index a876bb65eb2..2446d41c5c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -96,6 +96,7 @@ import org.apache.flink.runtime.scheduler.SchedulerUtils; import org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener; import org.apache.flink.runtime.scheduler.VertexParallelismInformation; import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation; import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots; import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; @@ -760,12 +761,15 @@ public class AdaptiveScheduler .isPresent(); } - private JobSchedulingPlan determineParallelism(SlotAllocator slotAllocator) + private JobSchedulingPlan determineParallelism( + SlotAllocator slotAllocator, @Nullable ExecutionGraph previousExecutionGraph) throws NoResourceAvailableException { return slotAllocator .determineParallelismAndCalculateAssignment( - jobInformation, declarativeSlotPool.getFreeSlotsInformation()) + jobInformation, + declarativeSlotPool.getFreeSlotsInformation(), + JobAllocationsInformation.fromGraph(previousExecutionGraph)) .orElseThrow( () -> new NoResourceAvailableException( @@ -921,8 +925,7 @@ public class AdaptiveScheduler public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) { final CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> executionGraphWithAvailableResourcesFuture = - createExecutionGraphWithAvailableResourcesAsync(); - + createExecutionGraphWithAvailableResourcesAsync(previousExecutionGraph); transitionToState( new CreatingExecutionGraph.Factory( this, @@ -932,12 +935,13 @@ public class AdaptiveScheduler } private CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> - createExecutionGraphWithAvailableResourcesAsync() { + createExecutionGraphWithAvailableResourcesAsync( + @Nullable ExecutionGraph previousExecutionGraph) { final JobSchedulingPlan schedulingPlan; final VertexParallelismStore adjustedParallelismStore; try { - schedulingPlan = determineParallelism(slotAllocator); + schedulingPlan = determineParallelism(slotAllocator, previousExecutionGraph); JobGraph adjustedJobGraph = jobInformation.copyJobGraph(); for (JobVertex vertex : adjustedJobGraph.getVertices()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java index 1b0bb60e7e6..2111b6e9c07 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobGraphJobInformation.java @@ -106,6 +106,11 @@ public class JobGraphJobInformation implements JobInformation { return parallelismInfo.getParallelism(); } + @Override + public int getMaxParallelism() { + return parallelismInfo.getMaxParallelism(); + } + @Override public SlotSharingGroup getSlotSharingGroup() { return jobVertex.getSlotSharingGroup(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java new file mode 100644 index 00000000000..0a8813b33a6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** Simple {@link SlotAssigner} that treats all slots and slot sharing groups equally. */ +public class DefaultSlotAssigner implements SlotAssigner { + + @Override + public Collection<SlotAssignment> assignSlots( + JobInformation jobInformation, + Collection<? extends SlotInfo> freeSlots, + VertexParallelism vertexParallelism, + JobAllocationsInformation previousAllocations) { + List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>(); + for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) { + allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup)); + } + + Iterator<? extends SlotInfo> iterator = freeSlots.iterator(); + Collection<SlotAssignment> assignments = new ArrayList<>(); + for (ExecutionSlotSharingGroup group : allGroups) { + assignments.add(new SlotAssignment(iterator.next(), group)); + } + return assignments; + } + + static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups( + VertexParallelism vertexParallelism, SlotSharingGroup slotSharingGroup) { + final Map<Integer, Set<ExecutionVertexID>> sharedSlotToVertexAssignment = new HashMap<>(); + slotSharingGroup + .getJobVertexIds() + .forEach( + jobVertexId -> { + int parallelism = vertexParallelism.getParallelism(jobVertexId); + for (int subtaskIdx = 0; subtaskIdx < parallelism; subtaskIdx++) { + sharedSlotToVertexAssignment + .computeIfAbsent(subtaskIdx, ignored -> new HashSet<>()) + .add(new ExecutionVertexID(jobVertexId, subtaskIdx)); + } + }); + return sharedSlotToVertexAssignment.values().stream() + .map(ExecutionSlotSharingGroup::new) + .collect(Collectors.toList()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java new file mode 100644 index 00000000000..e17d242d342 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; + +/** Information about allocations of Job Vertices. */ +@Internal +public class JobAllocationsInformation { + + private final Map<JobVertexID, List<VertexAllocationInformation>> vertexAllocations; + + JobAllocationsInformation( + Map<JobVertexID, List<VertexAllocationInformation>> vertexAllocations) { + this.vertexAllocations = vertexAllocations; + } + + public static JobAllocationsInformation fromGraph(@Nullable ExecutionGraph graph) { + return graph == null ? empty() : new JobAllocationsInformation(calculateAllocations(graph)); + } + + public List<VertexAllocationInformation> getAllocations(JobVertexID jobVertexID) { + return vertexAllocations.getOrDefault(jobVertexID, emptyList()); + } + + private static Map<JobVertexID, List<VertexAllocationInformation>> calculateAllocations( + ExecutionGraph graph) { + final Map<JobVertexID, List<VertexAllocationInformation>> allocations = new HashMap<>(); + for (ExecutionJobVertex vertex : graph.getVerticesTopologically()) { + JobVertexID jobVertexId = vertex.getJobVertexId(); + for (ExecutionVertex executionVertex : vertex.getTaskVertices()) { + AllocationID allocationId = + executionVertex.getCurrentExecutionAttempt().getAssignedAllocationID(); + KeyGroupRange kgr = + KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex( + vertex.getMaxParallelism(), + vertex.getParallelism(), + executionVertex.getParallelSubtaskIndex()); + allocations + .computeIfAbsent(jobVertexId, ignored -> new ArrayList<>()) + .add(new VertexAllocationInformation(allocationId, jobVertexId, kgr)); + } + } + return allocations; + } + + public static JobAllocationsInformation empty() { + return new JobAllocationsInformation(emptyMap()); + } + + public boolean isEmpty() { + return vertexAllocations.isEmpty(); + } + + /** Information about the allocations of a single Job Vertex. */ + public static class VertexAllocationInformation { + private final AllocationID allocationID; + private final JobVertexID jobVertexID; + private final KeyGroupRange keyGroupRange; + + public VertexAllocationInformation( + AllocationID allocationID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange) { + this.allocationID = allocationID; + this.jobVertexID = jobVertexID; + this.keyGroupRange = keyGroupRange; + } + + public AllocationID getAllocationID() { + return allocationID; + } + + public JobVertexID getJobVertexID() { + return jobVertexID; + } + + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java index 99b8d48c051..b1ff9183b6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java @@ -44,6 +44,8 @@ public interface JobInformation { int getParallelism(); + int getMaxParallelism(); + SlotSharingGroup getSlotSharingGroup(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java index fea0d659ad3..629a57f5122 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java @@ -60,7 +60,9 @@ public interface SlotAllocator { * assignment of slots to execution slot sharing groups. */ Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment( - JobInformation jobInformation, Collection<? extends SlotInfo> slots); + JobInformation jobInformation, + Collection<? extends SlotInfo> slots, + JobAllocationsInformation jobAllocationsInformation); /** * Reserves slots according to the given assignment if possible. If the underlying set of diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java similarity index 52% copy from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java index 99b8d48c051..cb264e28d88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAssigner.java @@ -17,33 +17,19 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; import java.util.Collection; -/** Information about the job. */ -public interface JobInformation { - /** - * Returns all slot-sharing groups of the job. - * - * <p>Attention: The returned slot sharing groups should never be modified (they are indeed - * mutable)! - * - * @return all slot-sharing groups of the job - */ - Collection<SlotSharingGroup> getSlotSharingGroups(); +/** Interface for assigning slots to slot sharing groups. */ +@Internal +public interface SlotAssigner { - VertexInformation getVertexInformation(JobVertexID jobVertexId); - - Iterable<VertexInformation> getVertices(); - - /** Information about a single vertex. */ - interface VertexInformation { - JobVertexID getJobVertexID(); - - int getParallelism(); - - SlotSharingGroup getSlotSharingGroup(); - } + Collection<SlotAssignment> assignSlots( + JobInformation jobInformation, + Collection<? extends SlotInfo> freeSlots, + VertexParallelism vertexParallelism, + JobAllocationsInformation previousAllocations); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java index 2b7a9c362ad..3d0f30b69e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java @@ -37,12 +37,11 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; /** {@link SlotAllocator} implementation that supports slot sharing. */ @@ -125,30 +124,24 @@ public class SlotSharingSlotAllocator implements SlotAllocator { @Override public Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment( - JobInformation jobInformation, Collection<? extends SlotInfo> slots) { + JobInformation jobInformation, + Collection<? extends SlotInfo> slots, + JobAllocationsInformation jobAllocationsInformation) { return determineParallelism(jobInformation, slots) .map( - parallelism -> - new JobSchedulingPlan( - parallelism, - assignSlots(jobInformation, slots, parallelism))); - } - - private Collection<SlotAssignment> assignSlots( - JobInformation jobInformation, - Collection<? extends SlotInfo> freeSlots, - VertexParallelism vertexParallelism) { - List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>(); - for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) { - allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup)); - } - - Iterator<? extends SlotInfo> iterator = freeSlots.iterator(); - Collection<SlotAssignment> assignments = new ArrayList<>(); - for (ExecutionSlotSharingGroup group : allGroups) { - assignments.add(new SlotAssignment(iterator.next(), group)); - } - return assignments; + parallelism -> { + SlotAssigner slotAssigner = + jobAllocationsInformation.isEmpty() + ? new DefaultSlotAssigner() + : new StateLocalitySlotAssigner(); + return new JobSchedulingPlan( + parallelism, + slotAssigner.assignSlots( + jobInformation, + slots, + parallelism, + jobAllocationsInformation)); + }); } /** @@ -200,24 +193,6 @@ public class SlotSharingSlotAllocator implements SlotAllocator { return vertexParallelism; } - private static List<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups( - VertexParallelism vertexParallelism, SlotSharingGroup slotSharingGroup) { - final Map<Integer, Set<ExecutionVertexID>> sharedSlotToVertexAssignment = new HashMap<>(); - - for (JobVertexID jobVertexId : slotSharingGroup.getJobVertexIds()) { - int parallelism = vertexParallelism.getParallelism(jobVertexId); - for (int subtaskIdx = 0; subtaskIdx < parallelism; subtaskIdx++) { - sharedSlotToVertexAssignment - .computeIfAbsent(subtaskIdx, ignored -> new HashSet<>()) - .add(new ExecutionVertexID(jobVertexId, subtaskIdx)); - } - } - - return sharedSlotToVertexAssignment.values().stream() - .map(ExecutionSlotSharingGroup::new) - .collect(Collectors.toList()); - } - @Override public Optional<ReservedSlots> tryReserveResources(JobSchedulingPlan jobSchedulingPlan) { final Collection<AllocationID> expectedSlots = @@ -278,10 +253,21 @@ public class SlotSharingSlotAllocator implements SlotAllocator { } static class ExecutionSlotSharingGroup { + private final String id; private final Set<ExecutionVertexID> containedExecutionVertices; public ExecutionSlotSharingGroup(Set<ExecutionVertexID> containedExecutionVertices) { + this(containedExecutionVertices, UUID.randomUUID().toString()); + } + + public ExecutionSlotSharingGroup( + Set<ExecutionVertexID> containedExecutionVertices, String id) { this.containedExecutionVertices = containedExecutionVertices; + this.id = id; + } + + public String getId() { + return id; } public Collection<ExecutionVertexID> getContainedExecutionVertices() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java new file mode 100644 index 00000000000..4e2a807c062 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmaster.SlotInfo; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.stream.Collectors; + +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static org.apache.flink.runtime.scheduler.adaptive.allocator.DefaultSlotAssigner.createExecutionSlotSharingGroups; +import static org.apache.flink.util.Preconditions.checkState; + +/** A {@link SlotAssigner} that assigns slots based on the number of local key groups. */ +@Internal +public class StateLocalitySlotAssigner implements SlotAssigner { + + private static class AllocationScore implements Comparable<AllocationScore> { + + private final String groupId; + private final AllocationID allocationId; + + public AllocationScore(String groupId, AllocationID allocationId, long score) { + this.groupId = groupId; + this.allocationId = allocationId; + this.score = score; + } + + private final long score; + + public String getGroupId() { + return groupId; + } + + public AllocationID getAllocationId() { + return allocationId; + } + + public long getScore() { + return score; + } + + @Override + public int compareTo(StateLocalitySlotAssigner.AllocationScore other) { + int result = Long.compare(score, other.score); + if (result != 0) { + return result; + } + result = other.allocationId.compareTo(allocationId); + if (result != 0) { + return result; + } + return other.groupId.compareTo(groupId); + } + } + + @Override + public Collection<SlotAssignment> assignSlots( + JobInformation jobInformation, + Collection<? extends SlotInfo> freeSlots, + VertexParallelism vertexParallelism, + JobAllocationsInformation previousAllocations) { + checkState( + freeSlots.size() >= jobInformation.getSlotSharingGroups().size(), + "Not enough slots to allocate all the slot sharing groups (have: %s, need: %s)", + freeSlots.size(), + jobInformation.getSlotSharingGroups().size()); + + final List<ExecutionSlotSharingGroup> allGroups = new ArrayList<>(); + for (SlotSharingGroup slotSharingGroup : jobInformation.getSlotSharingGroups()) { + allGroups.addAll(createExecutionSlotSharingGroups(vertexParallelism, slotSharingGroup)); + } + final Map<JobVertexID, Integer> parallelism = getParallelism(allGroups); + final PriorityQueue<AllocationScore> scores = + calculateScores(jobInformation, previousAllocations, allGroups, parallelism); + + final Map<String, ExecutionSlotSharingGroup> groupsById = + allGroups.stream().collect(toMap(ExecutionSlotSharingGroup::getId, identity())); + final Map<AllocationID, SlotInfo> slotsById = + freeSlots.stream().collect(toMap(SlotInfo::getAllocationId, identity())); + AllocationScore score; + final Collection<SlotAssignment> assignments = new ArrayList<>(); + while ((score = scores.poll()) != null) { + if (slotsById.containsKey(score.getAllocationId()) + && groupsById.containsKey(score.getGroupId())) { + assignments.add( + new SlotAssignment( + slotsById.remove(score.getAllocationId()), + groupsById.remove(score.getGroupId()))); + } + } + // Distribute the remaining slots with no score + Iterator<? extends SlotInfo> remainingSlots = slotsById.values().iterator(); + for (ExecutionSlotSharingGroup group : groupsById.values()) { + checkState( + remainingSlots.hasNext(), + "No slots available for group %s (%s more in total). This is likely a bug.", + group, + groupsById.size()); + assignments.add(new SlotAssignment(remainingSlots.next(), group)); + remainingSlots.remove(); + } + + return assignments; + } + + @Nonnull + private PriorityQueue<AllocationScore> calculateScores( + JobInformation jobInformation, + JobAllocationsInformation previousAllocations, + List<ExecutionSlotSharingGroup> allGroups, + Map<JobVertexID, Integer> parallelism) { + // PQ orders the pairs (allocationID, groupID) by score, decreasing + // the score is computed as the potential amount of state that would reside locally + final PriorityQueue<AllocationScore> scores = + new PriorityQueue<>(Comparator.reverseOrder()); + for (ExecutionSlotSharingGroup group : allGroups) { + scores.addAll(calculateScore(group, parallelism, jobInformation, previousAllocations)); + } + return scores; + } + + private static Map<JobVertexID, Integer> getParallelism( + List<ExecutionSlotSharingGroup> groups) { + final Map<JobVertexID, Integer> parallelism = new HashMap<>(); + for (ExecutionSlotSharingGroup group : groups) { + for (ExecutionVertexID evi : group.getContainedExecutionVertices()) { + parallelism.merge(evi.getJobVertexId(), 1, Integer::sum); + } + } + return parallelism; + } + + public Collection<AllocationScore> calculateScore( + ExecutionSlotSharingGroup group, + Map<JobVertexID, Integer> parallelism, + JobInformation jobInformation, + JobAllocationsInformation previousAllocations) { + final Map<AllocationID, Long> score = new HashMap<>(); + for (ExecutionVertexID evi : group.getContainedExecutionVertices()) { + final KeyGroupRange kgr = + KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex( + jobInformation + .getVertexInformation(evi.getJobVertexId()) + .getMaxParallelism(), + parallelism.get(evi.getJobVertexId()), + evi.getSubtaskIndex()); + previousAllocations + .getAllocations(evi.getJobVertexId()) + .forEach( + allocation -> { + long value = + allocation + .getKeyGroupRange() + .getIntersection(kgr) + .getNumberOfKeyGroups(); + if (value > 0) { + score.merge(allocation.getAllocationID(), value, Long::sum); + } + }); + } + + return score.entrySet().stream() + .map(e -> new AllocationScore(group.getId(), e.getKey(), e.getValue())) + .collect(Collectors.toList()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java index 7643acc7410..9a6b1cab5c2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocatorTest.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; @@ -24,25 +25,31 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotInfo; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan; +import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation; import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.topology.VertexID; import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.util.TestLogger; import org.assertj.core.api.Assertions; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; +import java.util.Set; +import java.util.stream.IntStream; +import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.Matchers.contains; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -79,12 +86,11 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { final ResourceCounter resourceCounter = slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, vertex2, vertex3)); - assertThat(resourceCounter.getResources(), contains(ResourceProfile.UNKNOWN)); - assertThat( - resourceCounter.getResourceCount(ResourceProfile.UNKNOWN), - is( + assertThat(resourceCounter.getResources()).contains(ResourceProfile.UNKNOWN); + assertThat(resourceCounter.getResourceCount(ResourceProfile.UNKNOWN)) + .isEqualTo( Math.max(vertex1.getParallelism(), vertex2.getParallelism()) - + vertex3.getParallelism())); + + vertex3.getParallelism()); } @Test @@ -194,7 +200,8 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { final JobSchedulingPlan jobSchedulingPlan = slotAllocator - .determineParallelismAndCalculateAssignment(jobInformation, getSlots(50)) + .determineParallelismAndCalculateAssignment( + jobInformation, getSlots(50), JobAllocationsInformation.empty()) .get(); final ReservedSlots reservedSlots = @@ -234,7 +241,8 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { JobSchedulingPlan jobSchedulingPlan = slotSharingSlotAllocator - .determineParallelismAndCalculateAssignment(jobInformation, getSlots(50)) + .determineParallelismAndCalculateAssignment( + jobInformation, getSlots(50), JobAllocationsInformation.empty()) .get(); final Optional<? extends ReservedSlots> reservedSlots = @@ -243,6 +251,73 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { assertFalse(reservedSlots.isPresent()); } + /** + * Basic test to verify that allocation takes previous allocations into account to facilitate + * Local Recovery. + */ + @Test + public void testStickyAllocation() { + Map<JobVertexID, List<VertexAllocationInformation>> locality = new HashMap<>(); + + // previous allocation allocation1: v1, v2 + AllocationID allocation1 = new AllocationID(); + locality.put( + vertex1.getJobVertexID(), + Collections.singletonList( + new VertexAllocationInformation( + allocation1, vertex1.getJobVertexID(), KeyGroupRange.of(1, 100)))); + locality.put( + vertex2.getJobVertexID(), + Collections.singletonList( + new VertexAllocationInformation( + allocation1, vertex2.getJobVertexID(), KeyGroupRange.of(1, 100)))); + + // previous allocation allocation2: v3 + AllocationID allocation2 = new AllocationID(); + locality.put( + vertex3.getJobVertexID(), + Collections.singletonList( + new VertexAllocationInformation( + allocation2, vertex3.getJobVertexID(), KeyGroupRange.of(1, 100)))); + + List<SlotInfo> freeSlots = new ArrayList<>(); + IntStream.range(0, 10).forEach(i -> freeSlots.add(new TestSlotInfo(new AllocationID()))); + freeSlots.add(new TestSlotInfo(allocation1)); + freeSlots.add(new TestSlotInfo(allocation2)); + + Map<JobVertexID, Long> stateSizes = new HashMap<>(); + stateSizes.put(vertex1.getJobVertexID(), 10L); + stateSizes.put(vertex2.getJobVertexID(), 10L); + stateSizes.put(vertex3.getJobVertexID(), 10L); + + JobSchedulingPlan schedulingPlan = + SlotSharingSlotAllocator.createSlotSharingSlotAllocator( + (allocationId, resourceProfile) -> + TestingPhysicalSlot.builder().build(), + (allocationID, cause, ts) -> {}, + id -> false) + .determineParallelismAndCalculateAssignment( + new TestJobInformation(Arrays.asList(vertex1, vertex2, vertex3)), + freeSlots, + new JobAllocationsInformation(locality)) + .get(); + + Map<AllocationID, Set<VertexID>> allocated = new HashMap<>(); + for (JobSchedulingPlan.SlotAssignment assignment : schedulingPlan.getSlotAssignments()) { + ExecutionSlotSharingGroup target = + assignment.getTargetAs(ExecutionSlotSharingGroup.class); + Set<VertexID> set = + allocated.computeIfAbsent( + assignment.getSlotInfo().getAllocationId(), ign -> new HashSet<>()); + for (ExecutionVertexID id : target.getContainedExecutionVertices()) { + set.add(id.getJobVertexId()); + } + } + assertThat(allocated.get(allocation1)).contains(vertex1.getJobVertexID()); + assertThat(allocated.get(allocation1)).contains(vertex2.getJobVertexID()); + assertThat(allocated.get(allocation2)).contains(vertex3.getJobVertexID()); + } + private static Collection<SlotInfo> getSlots(int count) { final Collection<SlotInfo> slotInfo = new ArrayList<>(); for (int i = 0; i < count; i++) { @@ -250,68 +325,4 @@ public class SlotSharingSlotAllocatorTest extends TestLogger { } return slotInfo; } - - private static class TestJobInformation implements JobInformation { - - private final Map<JobVertexID, VertexInformation> vertexIdToInformation; - private final Collection<SlotSharingGroup> slotSharingGroups; - - private TestJobInformation(Collection<VertexInformation> vertexIdToInformation) { - this.vertexIdToInformation = - vertexIdToInformation.stream() - .collect( - Collectors.toMap( - VertexInformation::getJobVertexID, - Function.identity())); - this.slotSharingGroups = - vertexIdToInformation.stream() - .map(VertexInformation::getSlotSharingGroup) - .collect(Collectors.toSet()); - } - - @Override - public Collection<SlotSharingGroup> getSlotSharingGroups() { - return slotSharingGroups; - } - - @Override - public VertexInformation getVertexInformation(JobVertexID jobVertexId) { - return vertexIdToInformation.get(jobVertexId); - } - - @Override - public Iterable<VertexInformation> getVertices() { - return vertexIdToInformation.values(); - } - } - - private static class TestVertexInformation implements JobInformation.VertexInformation { - - private final JobVertexID jobVertexId; - private final int parallelism; - private final SlotSharingGroup slotSharingGroup; - - private TestVertexInformation( - JobVertexID jobVertexId, int parallelism, SlotSharingGroup slotSharingGroup) { - this.jobVertexId = jobVertexId; - this.parallelism = parallelism; - this.slotSharingGroup = slotSharingGroup; - slotSharingGroup.addVertexToGroup(jobVertexId); - } - - @Override - public JobVertexID getJobVertexID() { - return jobVertexId; - } - - @Override - public int getParallelism() { - return parallelism; - } - - @Override - public SlotSharingGroup getSlotSharingGroup() { - return slotSharingGroup; - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java new file mode 100644 index 00000000000..aa9ea5a6bf9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssignerTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan.SlotAssignment; +import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation.VertexAllocationInformation; +import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation.VertexInformation; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import org.hamcrest.MatcherAssert; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; + +/** {@link StateLocalitySlotAssigner} test. */ +class StateLocalitySlotAssignerTest { + @Test + public void testSlotsAreNotWasted() { + VertexInformation vertex = createVertex(2); + AllocationID alloc1 = new AllocationID(); + AllocationID alloc2 = new AllocationID(); + + List<VertexAllocationInformation> allocations = + Arrays.asList( + new VertexAllocationInformation( + alloc1, vertex.getJobVertexID(), KeyGroupRange.of(0, 9)), + new VertexAllocationInformation( + alloc2, vertex.getJobVertexID(), KeyGroupRange.of(10, 19))); + + assign(vertex, Arrays.asList(alloc1, alloc2), allocations); + } + + @Test + public void testUpScaling() { + final int oldParallelism = 3; + final int newParallelism = 7; + final int numFreeSlots = 100; + final VertexInformation vertex = createVertex(newParallelism); + final List<AllocationID> allocationIDs = createAllocationIDS(numFreeSlots); + + List<VertexAllocationInformation> prevAllocations = new ArrayList<>(); + Iterator<AllocationID> iterator = allocationIDs.iterator(); + for (int i = 0; i < oldParallelism; i++) { + prevAllocations.add( + new VertexAllocationInformation( + iterator.next(), + vertex.getJobVertexID(), + KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex( + vertex.getMaxParallelism(), oldParallelism, i))); + } + + Collection<SlotAssignment> assignments = assign(vertex, allocationIDs, prevAllocations); + + verifyAssignments( + assignments, + newParallelism, + prevAllocations.stream() + .map(VertexAllocationInformation::getAllocationID) + .toArray(AllocationID[]::new)); + } + + @Test + public void testDownScaling() { + final int oldParallelism = 5; + final int newParallelism = 1; + final int numFreeSlots = 100; + final VertexInformation vertex = createVertex(newParallelism); + final List<AllocationID> allocationIDs = createAllocationIDS(numFreeSlots); + + // pretend that the 1st (0) subtask had half of key groups ... + final Iterator<AllocationID> iterator = allocationIDs.iterator(); + final AllocationID biggestAllocation = iterator.next(); + final List<VertexAllocationInformation> prevAllocations = new ArrayList<>(); + final int halfOfKeyGroupRange = vertex.getMaxParallelism() / 2; + prevAllocations.add( + new VertexAllocationInformation( + biggestAllocation, + vertex.getJobVertexID(), + KeyGroupRange.of(0, halfOfKeyGroupRange - 1))); + + // and the remaining subtasks had only one key group each + for (int subtaskIdx = 1; subtaskIdx < oldParallelism; subtaskIdx++) { + int keyGroup = halfOfKeyGroupRange + subtaskIdx; + prevAllocations.add( + new VertexAllocationInformation( + iterator.next(), + vertex.getJobVertexID(), + KeyGroupRange.of(keyGroup, keyGroup))); + } + + Collection<SlotAssignment> assignments = assign(vertex, allocationIDs, prevAllocations); + + verifyAssignments(assignments, newParallelism, biggestAllocation); + } + + private static void verifyAssignments( + Collection<SlotAssignment> assignments, + int expectedSize, + AllocationID... mustHaveAllocationID) { + MatcherAssert.assertThat(assignments, hasSize(expectedSize)); + MatcherAssert.assertThat( + assignments.stream() + .map(e -> e.getSlotInfo().getAllocationId()) + .collect(Collectors.toSet()), + hasItems(mustHaveAllocationID)); + } + + private static Collection<SlotAssignment> assign( + VertexInformation vertexInformation, + List<AllocationID> allocationIDs, + List<VertexAllocationInformation> allocations) { + return new StateLocalitySlotAssigner() + .assignSlots( + new TestJobInformation(singletonList(vertexInformation)), + allocationIDs.stream().map(TestSlotInfo::new).collect(Collectors.toList()), + new VertexParallelism( + singletonMap( + vertexInformation.getJobVertexID(), + vertexInformation.getParallelism())), + new JobAllocationsInformation( + singletonMap(vertexInformation.getJobVertexID(), allocations))); + } + + private static VertexInformation createVertex(int parallelism) { + JobVertexID id = new JobVertexID(); + SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); + slotSharingGroup.addVertexToGroup(id); + return new TestVertexInformation(id, parallelism, slotSharingGroup); + } + + private static List<AllocationID> createAllocationIDS(int numFreeSlots) { + return IntStream.range(0, numFreeSlots) + .mapToObj(i -> new AllocationID()) + .collect(Collectors.toList()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java new file mode 100644 index 00000000000..b9a3671d3c4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestJobInformation.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.allocator; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; + +import java.util.Collection; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +class TestJobInformation implements JobInformation { + + private final Map<JobVertexID, VertexInformation> vertexIdToInformation; + private final Collection<SlotSharingGroup> slotSharingGroups; + + TestJobInformation(Collection<? extends VertexInformation> vertexIdToInformation) { + this.vertexIdToInformation = + vertexIdToInformation.stream() + .collect( + Collectors.toMap( + VertexInformation::getJobVertexID, Function.identity())); + this.slotSharingGroups = + vertexIdToInformation.stream() + .map(VertexInformation::getSlotSharingGroup) + .collect(Collectors.toSet()); + } + + @Override + public Collection<SlotSharingGroup> getSlotSharingGroups() { + return slotSharingGroups; + } + + @Override + public VertexInformation getVertexInformation(JobVertexID jobVertexId) { + return vertexIdToInformation.get(jobVertexId); + } + + @Override + public Iterable<VertexInformation> getVertices() { + return vertexIdToInformation.values(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java index b023e5edf49..e467554ddfa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestSlotInfo.java @@ -26,7 +26,15 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; /** Test {@link SlotInfo} implementation. */ class TestSlotInfo implements SlotInfo { - private final AllocationID allocationId = new AllocationID(); + private final AllocationID allocationId; + + public TestSlotInfo() { + this(new AllocationID()); + } + + public TestSlotInfo(AllocationID allocationId) { + this.allocationId = allocationId; + } @Override public AllocationID getAllocationId() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java similarity index 53% copy from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java copy to flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java index 99b8d48c051..4666ba20647 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobInformation.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestVertexInformation.java @@ -20,30 +20,37 @@ package org.apache.flink.runtime.scheduler.adaptive.allocator; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import java.util.Collection; - -/** Information about the job. */ -public interface JobInformation { - /** - * Returns all slot-sharing groups of the job. - * - * <p>Attention: The returned slot sharing groups should never be modified (they are indeed - * mutable)! - * - * @return all slot-sharing groups of the job - */ - Collection<SlotSharingGroup> getSlotSharingGroups(); - - VertexInformation getVertexInformation(JobVertexID jobVertexId); +class TestVertexInformation implements JobInformation.VertexInformation { + + private final JobVertexID jobVertexId; + private final int parallelism; + private final SlotSharingGroup slotSharingGroup; + + TestVertexInformation( + JobVertexID jobVertexId, int parallelism, SlotSharingGroup slotSharingGroup) { + this.jobVertexId = jobVertexId; + this.parallelism = parallelism; + this.slotSharingGroup = slotSharingGroup; + slotSharingGroup.addVertexToGroup(jobVertexId); + } - Iterable<VertexInformation> getVertices(); + @Override + public JobVertexID getJobVertexID() { + return jobVertexId; + } - /** Information about a single vertex. */ - interface VertexInformation { - JobVertexID getJobVertexID(); + @Override + public int getParallelism() { + return parallelism; + } - int getParallelism(); + @Override + public int getMaxParallelism() { + return 128; + } - SlotSharingGroup getSlotSharingGroup(); + @Override + public SlotSharingGroup getSlotSharingGroup() { + return slotSharingGroup; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java index 44b73122ae9..0fd638c43bf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/allocator/TestingSlotAllocator.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.util.ResourceCounter; import java.util.Collection; import java.util.Optional; -import java.util.function.BiFunction; import java.util.function.Function; /** Testing implementation of {@link SlotAllocator}. */ @@ -33,20 +32,13 @@ public class TestingSlotAllocator implements SlotAllocator { private final Function<Iterable<JobInformation.VertexInformation>, ResourceCounter> calculateRequiredSlotsFunction; - private final BiFunction< - JobInformation, Collection<? extends SlotInfo>, Optional<VertexParallelism>> - determineParallelismFunction; - private final Function<VertexParallelism, Optional<ReservedSlots>> tryReserveResourcesFunction; private TestingSlotAllocator( Function<Iterable<JobInformation.VertexInformation>, ResourceCounter> calculateRequiredSlotsFunction, - BiFunction<JobInformation, Collection<? extends SlotInfo>, Optional<VertexParallelism>> - determineParallelismFunction, Function<VertexParallelism, Optional<ReservedSlots>> tryReserveResourcesFunction) { this.calculateRequiredSlotsFunction = calculateRequiredSlotsFunction; - this.determineParallelismFunction = determineParallelismFunction; this.tryReserveResourcesFunction = tryReserveResourcesFunction; } @@ -59,12 +51,14 @@ public class TestingSlotAllocator implements SlotAllocator { @Override public Optional<VertexParallelism> determineParallelism( JobInformation jobInformation, Collection<? extends SlotInfo> slots) { - return determineParallelismFunction.apply(jobInformation, slots); + return Optional.empty(); } @Override public Optional<JobSchedulingPlan> determineParallelismAndCalculateAssignment( - JobInformation jobInformation, Collection<? extends SlotInfo> slots) { + JobInformation jobInformation, + Collection<? extends SlotInfo> slots, + JobAllocationsInformation jobAllocationsInformation) { return Optional.empty(); } @@ -81,9 +75,6 @@ public class TestingSlotAllocator implements SlotAllocator { public static final class Builder { private Function<Iterable<JobInformation.VertexInformation>, ResourceCounter> calculateRequiredSlotsFunction = ignored -> ResourceCounter.empty(); - private BiFunction< - JobInformation, Collection<? extends SlotInfo>, Optional<VertexParallelism>> - determineParallelismFunction = (ignoredA, ignoredB) -> Optional.empty(); private Function<VertexParallelism, Optional<ReservedSlots>> tryReserveResourcesFunction = ignored -> Optional.empty(); @@ -94,16 +85,6 @@ public class TestingSlotAllocator implements SlotAllocator { return this; } - public Builder setDetermineParallelismFunction( - BiFunction< - JobInformation, - Collection<? extends SlotInfo>, - Optional<VertexParallelism>> - determineParallelismFunction) { - this.determineParallelismFunction = determineParallelismFunction; - return this; - } - public Builder setTryReserveResourcesFunction( Function<VertexParallelism, Optional<ReservedSlots>> tryReserveResourcesFunction) { this.tryReserveResourcesFunction = tryReserveResourcesFunction; @@ -112,9 +93,7 @@ public class TestingSlotAllocator implements SlotAllocator { public TestingSlotAllocator build() { return new TestingSlotAllocator( - calculateRequiredSlotsFunction, - determineParallelismFunction, - tryReserveResourcesFunction); + calculateRequiredSlotsFunction, tryReserveResourcesFunction); } } }
