This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8dc257dcd3907cd42c15bd1748e4e1954e15696c
Author: Yangze Guo <[email protected]>
AuthorDate: Tue Dec 22 15:14:46 2020 +0800

    [FLINK-20837][runtime] Refactor dynamic SlotID
    
    This closes #14560
---
 .../runtime/clusterframework/types/SlotID.java     |  4 +--
 .../taskexecutor/slot/TaskSlotTableImpl.java       | 35 ++++++++++++++--------
 .../runtime/taskexecutor/TaskExecutorTest.java     |  4 +--
 .../taskexecutor/slot/TaskSlotTableImplTest.java   | 14 ++++-----
 4 files changed, 34 insertions(+), 23 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
index 36628a1..dc8aa80 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -90,8 +90,8 @@ public class SlotID implements ResourceIDRetrievable, 
Serializable {
         return resourceId + "_" + (slotNumber >= 0 ? slotNumber : "dynamic");
     }
 
-    /** Generate a SlotID without actual slot index for dynamic slot 
allocation. */
-    public static SlotID generateDynamicSlotID(ResourceID resourceID) {
+    /** Get a SlotID without actual slot index for dynamic slot allocation. */
+    public static SlotID getDynamicSlotID(ResourceID resourceID) {
         return new SlotID(resourceID);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
index 9d93d59..33ccbbd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
@@ -95,6 +95,9 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> 
implements TaskSlotTab
     /** The table state. */
     private volatile State state;
 
+    /** Current index for dynamic slot, should always not less than 
numberSlots */
+    private int dynamicSlotIndex;
+
     private final ResourceBudgetManager budgetManager;
 
     /** The closing future is completed when all slot are freed and state is 
closed. */
@@ -120,6 +123,7 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> 
implements TaskSlotTab
                 0 < numberSlots, "The number of task slots must be greater 
than 0.");
 
         this.numberSlots = numberSlots;
+        this.dynamicSlotIndex = numberSlots;
         this.defaultSlotResourceProfile = 
Preconditions.checkNotNull(defaultSlotResourceProfile);
         this.memoryPageSize = memoryPageSize;
 
@@ -247,11 +251,10 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> 
implements TaskSlotTab
         }
 
         for (TaskSlot<T> taskSlot : allocatedSlots.values()) {
-            if (taskSlot.getIndex() < 0) {
-                SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
+            if (isDynamicIndex(taskSlot.getIndex())) {
                 SlotStatus slotStatus =
                         new SlotStatus(
-                                slotID,
+                                new SlotID(resourceId, taskSlot.getIndex()),
                                 taskSlot.getResourceProfile(),
                                 taskSlot.getJobId(),
                                 taskSlot.getAllocationId());
@@ -277,14 +280,18 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> 
implements TaskSlotTab
 
     @Override
     public boolean allocateSlot(
-            int index,
+            int requestedIndex,
             JobID jobId,
             AllocationID allocationId,
             ResourceProfile resourceProfile,
             Time slotTimeout) {
         checkRunning();
 
-        Preconditions.checkArgument(index < numberSlots);
+        Preconditions.checkArgument(requestedIndex < numberSlots);
+
+        // The negative requestIndex indicate that the SlotManger allocate a 
dynamic slot, we
+        // transfer the index to an increasing number not less than the 
numberSlots.
+        int index = requestedIndex < 0 ? nextDynamicSlotIndex() : 
requestedIndex;
 
         TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
         if (taskSlot != null) {
@@ -297,7 +304,7 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> 
implements TaskSlotTab
             return false;
         }
 
-        resourceProfile = index >= 0 ? defaultSlotResourceProfile : 
resourceProfile;
+        resourceProfile = isDynamicIndex(index) ? resourceProfile : 
defaultSlotResourceProfile;
 
         if (!budgetManager.reserve(resourceProfile)) {
             LOG.info(
@@ -317,9 +324,7 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> 
implements TaskSlotTab
                         jobId,
                         allocationId,
                         memoryVerificationExecutor);
-        if (index >= 0) {
-            taskSlots.put(index, taskSlot);
-        }
+        taskSlots.put(index, taskSlot);
 
         // update the allocation id to task slot map
         allocatedSlots.put(allocationId, taskSlot);
@@ -351,13 +356,17 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> 
implements TaskSlotTab
                 index);
         return taskSlot.getJobId().equals(jobId)
                 && taskSlot.getResourceProfile().equals(resourceProfile)
-                && (index < 0 || taskSlot.getIndex() == index);
+                && (isDynamicIndex(index) || taskSlot.getIndex() == index);
     }
 
     private boolean isIndexAlreadyTaken(int index) {
         return taskSlots.get(index) != null;
     }
 
+    private boolean isDynamicIndex(int index) {
+        return index >= numberSlots;
+    }
+
     @Override
     public boolean markSlotActive(AllocationID allocationId) throws 
SlotNotFoundException {
         checkRunning();
@@ -469,8 +478,6 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> 
implements TaskSlotTab
         TaskSlot<T> taskSlot = taskSlots.get(index);
         if (taskSlot != null) {
             return taskSlot.isAllocated(jobId, allocationId);
-        } else if (index < 0) {
-            return allocatedSlots.containsKey(allocationId);
         } else {
             return false;
         }
@@ -625,6 +632,10 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> 
implements TaskSlotTab
         return allocatedSlots.get(allocationId);
     }
 
+    private int nextDynamicSlotIndex() {
+        return dynamicSlotIndex++;
+    }
+
     private void checkRunning() {
         Preconditions.checkState(
                 state == State.RUNNING,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 9f94c07..7f0f558 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -2075,7 +2075,7 @@ public class TaskExecutorTest extends TestLogger {
                     .taskExecutor
                     .getSelfGateway(TaskExecutorGateway.class)
                     .requestSlot(
-                            
SlotID.generateDynamicSlotID(ResourceID.generate()),
+                            SlotID.getDynamicSlotID(ResourceID.generate()),
                             jobId,
                             allocationId,
                             resourceProfile,
@@ -2092,7 +2092,7 @@ public class TaskExecutorTest extends TestLogger {
                             new SlotStatus(new SlotID(resourceId, 0), 
DEFAULT_RESOURCE_PROFILE),
                             new SlotStatus(new SlotID(resourceId, 1), 
DEFAULT_RESOURCE_PROFILE),
                             new SlotStatus(
-                                    SlotID.generateDynamicSlotID(resourceId),
+                                    new SlotID(resourceId, 2),
                                     resourceProfile,
                                     jobId,
                                     allocationId)));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
index 72e6717..067f8b7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
@@ -158,7 +158,7 @@ public class TaskSlotTableImplTest extends TestLogger {
             assertThat(
                     taskSlotTable.allocateSlot(-1, jobId2, allocationId, 
SLOT_TIMEOUT), is(false));
 
-            assertThat(taskSlotTable.isAllocated(-1, jobId1, allocationId), 
is(true));
+            assertThat(taskSlotTable.isAllocated(1, jobId1, allocationId), 
is(true));
 
             Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
                     taskSlotTable.getAllocatedSlots(jobId1);
@@ -201,7 +201,7 @@ public class TaskSlotTableImplTest extends TestLogger {
             allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
             TaskSlot<TaskSlotPayload> taskSlot2 = allocatedSlots.next();
 
-            assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), 
is(true));
+            assertThat(taskSlotTable.isAllocated(1, jobId, allocationId), 
is(true));
             assertEquals(taskSlot1, taskSlot2);
             assertThat(allocatedSlots.hasNext(), is(false));
         }
@@ -239,9 +239,9 @@ public class TaskSlotTableImplTest extends TestLogger {
 
             Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
                     taskSlotTable.getAllocatedSlots(jobId);
-            assertThat(allocatedSlots.next().getIndex(), is(-1));
+            assertThat(allocatedSlots.next().getIndex(), is(2));
             assertThat(allocatedSlots.hasNext(), is(false));
-            assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), 
is(true));
+            assertThat(taskSlotTable.isAllocated(2, jobId, allocationId), 
is(true));
         }
     }
 
@@ -262,7 +262,7 @@ public class TaskSlotTableImplTest extends TestLogger {
             Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
                     taskSlotTable.getAllocatedSlots(jobId);
             TaskSlot<TaskSlotPayload> allocatedSlot = allocatedSlots.next();
-            assertThat(allocatedSlot.getIndex(), is(-1));
+            assertThat(allocatedSlot.getIndex(), is(2));
             assertThat(allocatedSlot.getResourceProfile(), 
is(resourceProfile));
             assertThat(allocatedSlots.hasNext(), is(false));
         }
@@ -305,7 +305,7 @@ public class TaskSlotTableImplTest extends TestLogger {
                     taskSlotTable.allocateSlot(-1, jobId, allocationId3, 
SLOT_TIMEOUT),
                     is(true)); // index 4
 
-            assertThat(taskSlotTable.freeSlot(allocationId2), is(-1));
+            assertThat(taskSlotTable.freeSlot(allocationId2), is(3));
 
             ResourceID resourceId = ResourceID.generate();
             SlotReport slotReport = taskSlotTable.createSlotReport(resourceId);
@@ -336,7 +336,7 @@ public class TaskSlotTableImplTest extends TestLogger {
                                             null)),
                             is(
                                     new SlotStatus(
-                                            
SlotID.generateDynamicSlotID(resourceId),
+                                            new SlotID(resourceId, 4),
                                             
TaskSlotUtils.DEFAULT_RESOURCE_PROFILE,
                                             jobId,
                                             allocationId3))));

Reply via email to