This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8b5e172118a3f40bc14f9ab1ec969754615813a0
Author: Till Rohrmann <[email protected]>
AuthorDate: Sun Mar 14 17:34:41 2021 +0100

    [FLINK-21602] Remove generic parameter from SlotAllocator
    
    The generic parameter will no longer be needed once the parallelism 
calculation and the
    reservation of slots happens in two separate steps.
---
 .../runtime/scheduler/adaptive/AdaptiveScheduler.java     |  9 ++++-----
 .../scheduler/adaptive/allocator/SlotAllocator.java       |  6 +++---
 .../adaptive/allocator/SlotSharingSlotAllocator.java      | 15 +++++++++++++--
 3 files changed, 20 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index be7b068..f420ce5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -190,7 +190,7 @@ public class AdaptiveScheduler
 
     private final JobStatusListener jobStatusListener;
 
-    private final SlotAllocator<?> slotAllocator;
+    private final SlotAllocator slotAllocator;
 
     private final ScaleUpController scaleUpController;
 
@@ -581,11 +581,10 @@ public class AdaptiveScheduler
         return outstandingResources.isEmpty();
     }
 
-    private <T extends VertexParallelism>
-            ParallelismAndResourceAssignments 
determineParallelismAndAssignResources(
-                    SlotAllocator<T> slotAllocator) throws 
JobExecutionException {
+    private ParallelismAndResourceAssignments 
determineParallelismAndAssignResources(
+            SlotAllocator slotAllocator) throws JobExecutionException {
 
-        final T vertexParallelism =
+        final VertexParallelism vertexParallelism =
                 slotAllocator
                         .determineParallelism(
                                 jobInformation, 
declarativeSlotPool.getFreeSlotsInformation())
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
index 3bf1a2a..2519db8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
@@ -27,7 +27,7 @@ import java.util.Map;
 import java.util.Optional;
 
 /** Component for calculating the slot requirements and mapping of vertices to 
slots. */
-public interface SlotAllocator<T extends VertexParallelism> {
+public interface SlotAllocator {
 
     /**
      * Calculates the total resources required for scheduling the given 
vertices.
@@ -58,7 +58,7 @@ public interface SlotAllocator<T extends VertexParallelism> {
      *     how the vertices could be assigned to slots, if all vertices could 
be run with the given
      *     slots
      */
-    Optional<T> determineParallelism(
+    Optional<? extends VertexParallelism> determineParallelism(
             JobInformation jobInformation, Collection<? extends SlotInfo> 
slots);
 
     /**
@@ -67,5 +67,5 @@ public interface SlotAllocator<T extends VertexParallelism> {
      * @param vertexParallelism information on how slots should be assigned to 
the slots
      * @return mapping of vertices to slots
      */
-    Map<ExecutionVertexID, LogicalSlot> reserveResources(T vertexParallelism);
+    Map<ExecutionVertexID, LogicalSlot> reserveResources(VertexParallelism 
vertexParallelism);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
index 7e256ec..26853cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -40,7 +41,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 /** {@link SlotAllocator} implementation that supports slot sharing. */
-public class SlotSharingSlotAllocator implements 
SlotAllocator<VertexParallelismWithSlotSharing> {
+public class SlotSharingSlotAllocator implements SlotAllocator {
 
     private final ReserveSlotFunction reserveSlotFunction;
     private final FreeSlotFunction freeSlotFunction;
@@ -148,7 +149,17 @@ public class SlotSharingSlotAllocator implements 
SlotAllocator<VertexParallelism
 
     @Override
     public Map<ExecutionVertexID, LogicalSlot> reserveResources(
-            VertexParallelismWithSlotSharing vertexParallelismWithSlotSharing) 
{
+            VertexParallelism vertexParallelism) {
+        Preconditions.checkArgument(
+                vertexParallelism instanceof VertexParallelismWithSlotSharing,
+                String.format(
+                        "%s expects %s as argument.",
+                        SlotSharingSlotAllocator.class.getSimpleName(),
+                        
VertexParallelismWithSlotSharing.class.getSimpleName()));
+
+        final VertexParallelismWithSlotSharing 
vertexParallelismWithSlotSharing =
+                (VertexParallelismWithSlotSharing) vertexParallelism;
+
         final Map<ExecutionVertexID, LogicalSlot> assignedSlots = new 
HashMap<>();
 
         for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroup :

Reply via email to