This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8b5e172118a3f40bc14f9ab1ec969754615813a0 Author: Till Rohrmann <[email protected]> AuthorDate: Sun Mar 14 17:34:41 2021 +0100 [FLINK-21602] Remove generic parameter from SlotAllocator The generic parameter will no longer be needed once the parallelism calculation and the reservation of slots happens in two separate steps. --- .../runtime/scheduler/adaptive/AdaptiveScheduler.java | 9 ++++----- .../scheduler/adaptive/allocator/SlotAllocator.java | 6 +++--- .../adaptive/allocator/SlotSharingSlotAllocator.java | 15 +++++++++++++-- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index be7b068..f420ce5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -190,7 +190,7 @@ public class AdaptiveScheduler private final JobStatusListener jobStatusListener; - private final SlotAllocator<?> slotAllocator; + private final SlotAllocator slotAllocator; private final ScaleUpController scaleUpController; @@ -581,11 +581,10 @@ public class AdaptiveScheduler return outstandingResources.isEmpty(); } - private <T extends VertexParallelism> - ParallelismAndResourceAssignments determineParallelismAndAssignResources( - SlotAllocator<T> slotAllocator) throws JobExecutionException { + private ParallelismAndResourceAssignments determineParallelismAndAssignResources( + SlotAllocator slotAllocator) throws JobExecutionException { - final T vertexParallelism = + final VertexParallelism vertexParallelism = slotAllocator .determineParallelism( jobInformation, declarativeSlotPool.getFreeSlotsInformation()) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java index 3bf1a2a..2519db8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java @@ -27,7 +27,7 @@ import java.util.Map; import java.util.Optional; /** Component for calculating the slot requirements and mapping of vertices to slots. */ -public interface SlotAllocator<T extends VertexParallelism> { +public interface SlotAllocator { /** * Calculates the total resources required for scheduling the given vertices. @@ -58,7 +58,7 @@ public interface SlotAllocator<T extends VertexParallelism> { * how the vertices could be assigned to slots, if all vertices could be run with the given * slots */ - Optional<T> determineParallelism( + Optional<? extends VertexParallelism> determineParallelism( JobInformation jobInformation, Collection<? extends SlotInfo> slots); /** @@ -67,5 +67,5 @@ public interface SlotAllocator<T extends VertexParallelism> { * @param vertexParallelism information on how slots should be assigned to the slots * @return mapping of vertices to slots */ - Map<ExecutionVertexID, LogicalSlot> reserveResources(T vertexParallelism); + Map<ExecutionVertexID, LogicalSlot> reserveResources(VertexParallelism vertexParallelism); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java index 7e256ec..26853cf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.util.ResourceCounter; +import org.apache.flink.util.Preconditions; import java.util.ArrayList; import java.util.Collection; @@ -40,7 +41,7 @@ import java.util.Set; import java.util.stream.Collectors; /** {@link SlotAllocator} implementation that supports slot sharing. */ -public class SlotSharingSlotAllocator implements SlotAllocator<VertexParallelismWithSlotSharing> { +public class SlotSharingSlotAllocator implements SlotAllocator { private final ReserveSlotFunction reserveSlotFunction; private final FreeSlotFunction freeSlotFunction; @@ -148,7 +149,17 @@ public class SlotSharingSlotAllocator implements SlotAllocator<VertexParallelism @Override public Map<ExecutionVertexID, LogicalSlot> reserveResources( - VertexParallelismWithSlotSharing vertexParallelismWithSlotSharing) { + VertexParallelism vertexParallelism) { + Preconditions.checkArgument( + vertexParallelism instanceof VertexParallelismWithSlotSharing, + String.format( + "%s expects %s as argument.", + SlotSharingSlotAllocator.class.getSimpleName(), + VertexParallelismWithSlotSharing.class.getSimpleName())); + + final VertexParallelismWithSlotSharing vertexParallelismWithSlotSharing = + (VertexParallelismWithSlotSharing) vertexParallelism; + final Map<ExecutionVertexID, LogicalSlot> assignedSlots = new HashMap<>(); for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroup :
