xintongsong commented on a change in pull request #14560:
URL: https://github.com/apache/flink/pull/14560#discussion_r553155630



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -288,22 +288,13 @@ public boolean allocateSlot(
 
         TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
         if (taskSlot != null) {
-            LOG.info("Allocation ID {} is already allocated in {}.", 
allocationId, taskSlot);
-            return false;
-        }
-
-        if (taskSlots.containsKey(index)) {
-            TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
+            return isDuplicatedSlot(taskSlot, jobId, resourceProfile, index);
+        } else if (!isIndexAlreadyTaken(index)) {

Review comment:
       `!` should be removed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -277,27 +280,31 @@ public boolean allocateSlot(
 
     @Override
     public boolean allocateSlot(
-            int index,
+            int requestedIndex,
             JobID jobId,
             AllocationID allocationId,
             ResourceProfile resourceProfile,
             Time slotTimeout) {
         checkRunning();
 
-        Preconditions.checkArgument(index < numberSlots);
+        Preconditions.checkArgument(requestedIndex < numberSlots);
+
+        // The negative requestIndex indicate that the SlotManger allocate a 
dynamic slot, we
+        // transfer the index to an increasing number not less than the 
numberSlots.
+        int index = requestedIndex < 0 ? nextDynamicSlotIndex() : 
requestedIndex;
 
         TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
         if (taskSlot != null) {
             return isDuplicatedSlot(taskSlot, jobId, resourceProfile, index);
-        } else if (!isIndexAlreadyTaken(index)) {
+        } else if (isIndexAlreadyTaken(index)) {
             LOG.info(
                     "The static slot with index {} is already assigned to 
another allocation with id {}.",
                     index,
                     taskSlots.get(index).getAllocationId());
             return false;
         }
 
-        resourceProfile = index >= 0 ? defaultSlotResourceProfile : 
resourceProfile;
+        resourceProfile = index < numberSlots ? defaultSlotResourceProfile : 
resourceProfile;

Review comment:
       There are 3 occurrences of `index < numberSlots` and 1 occurrence of 
`index >= numberSlots` in this file.
   Let's deduplicate it with a util method.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##########
@@ -288,22 +288,13 @@ public boolean allocateSlot(
 
         TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
         if (taskSlot != null) {
-            LOG.info("Allocation ID {} is already allocated in {}.", 
allocationId, taskSlot);
-            return false;
-        }
-
-        if (taskSlots.containsKey(index)) {
-            TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
+            return isDuplicatedSlot(taskSlot, jobId, resourceProfile, index);
+        } else if (!isIndexAlreadyTaken(index)) {
             LOG.info(
-                    "Slot with index {} already exist, with resource profile 
{}, job id {} and allocation id {}.",
+                    "The static slot with index {} is already assigned to 
another allocation with id {}.",

Review comment:
       Not sure about exposing the concept *static* slot here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to