This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8dc257dcd3907cd42c15bd1748e4e1954e15696c Author: Yangze Guo <[email protected]> AuthorDate: Tue Dec 22 15:14:46 2020 +0800 [FLINK-20837][runtime] Refactor dynamic SlotID This closes #14560 --- .../runtime/clusterframework/types/SlotID.java | 4 +-- .../taskexecutor/slot/TaskSlotTableImpl.java | 35 ++++++++++++++-------- .../runtime/taskexecutor/TaskExecutorTest.java | 4 +-- .../taskexecutor/slot/TaskSlotTableImplTest.java | 14 ++++----- 4 files changed, 34 insertions(+), 23 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java index 36628a1..dc8aa80 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java @@ -90,8 +90,8 @@ public class SlotID implements ResourceIDRetrievable, Serializable { return resourceId + "_" + (slotNumber >= 0 ? slotNumber : "dynamic"); } - /** Generate a SlotID without actual slot index for dynamic slot allocation. */ - public static SlotID generateDynamicSlotID(ResourceID resourceID) { + /** Get a SlotID without actual slot index for dynamic slot allocation. */ + public static SlotID getDynamicSlotID(ResourceID resourceID) { return new SlotID(resourceID); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java index 9d93d59..33ccbbd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java @@ -95,6 +95,9 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab /** The table state. */ private volatile State state; + /** Current index for dynamic slot, should always not less than numberSlots */ + private int dynamicSlotIndex; + private final ResourceBudgetManager budgetManager; /** The closing future is completed when all slot are freed and state is closed. */ @@ -120,6 +123,7 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab 0 < numberSlots, "The number of task slots must be greater than 0."); this.numberSlots = numberSlots; + this.dynamicSlotIndex = numberSlots; this.defaultSlotResourceProfile = Preconditions.checkNotNull(defaultSlotResourceProfile); this.memoryPageSize = memoryPageSize; @@ -247,11 +251,10 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab } for (TaskSlot<T> taskSlot : allocatedSlots.values()) { - if (taskSlot.getIndex() < 0) { - SlotID slotID = SlotID.generateDynamicSlotID(resourceId); + if (isDynamicIndex(taskSlot.getIndex())) { SlotStatus slotStatus = new SlotStatus( - slotID, + new SlotID(resourceId, taskSlot.getIndex()), taskSlot.getResourceProfile(), taskSlot.getJobId(), taskSlot.getAllocationId()); @@ -277,14 +280,18 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab @Override public boolean allocateSlot( - int index, + int requestedIndex, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, Time slotTimeout) { checkRunning(); - Preconditions.checkArgument(index < numberSlots); + Preconditions.checkArgument(requestedIndex < numberSlots); + + // The negative requestIndex indicate that the SlotManger allocate a dynamic slot, we + // transfer the index to an increasing number not less than the numberSlots. + int index = requestedIndex < 0 ? nextDynamicSlotIndex() : requestedIndex; TaskSlot<T> taskSlot = allocatedSlots.get(allocationId); if (taskSlot != null) { @@ -297,7 +304,7 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab return false; } - resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile; + resourceProfile = isDynamicIndex(index) ? resourceProfile : defaultSlotResourceProfile; if (!budgetManager.reserve(resourceProfile)) { LOG.info( @@ -317,9 +324,7 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab jobId, allocationId, memoryVerificationExecutor); - if (index >= 0) { - taskSlots.put(index, taskSlot); - } + taskSlots.put(index, taskSlot); // update the allocation id to task slot map allocatedSlots.put(allocationId, taskSlot); @@ -351,13 +356,17 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab index); return taskSlot.getJobId().equals(jobId) && taskSlot.getResourceProfile().equals(resourceProfile) - && (index < 0 || taskSlot.getIndex() == index); + && (isDynamicIndex(index) || taskSlot.getIndex() == index); } private boolean isIndexAlreadyTaken(int index) { return taskSlots.get(index) != null; } + private boolean isDynamicIndex(int index) { + return index >= numberSlots; + } + @Override public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException { checkRunning(); @@ -469,8 +478,6 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab TaskSlot<T> taskSlot = taskSlots.get(index); if (taskSlot != null) { return taskSlot.isAllocated(jobId, allocationId); - } else if (index < 0) { - return allocatedSlots.containsKey(allocationId); } else { return false; } @@ -625,6 +632,10 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab return allocatedSlots.get(allocationId); } + private int nextDynamicSlotIndex() { + return dynamicSlotIndex++; + } + private void checkRunning() { Preconditions.checkState( state == State.RUNNING, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 9f94c07..7f0f558 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -2075,7 +2075,7 @@ public class TaskExecutorTest extends TestLogger { .taskExecutor .getSelfGateway(TaskExecutorGateway.class) .requestSlot( - SlotID.generateDynamicSlotID(ResourceID.generate()), + SlotID.getDynamicSlotID(ResourceID.generate()), jobId, allocationId, resourceProfile, @@ -2092,7 +2092,7 @@ public class TaskExecutorTest extends TestLogger { new SlotStatus(new SlotID(resourceId, 0), DEFAULT_RESOURCE_PROFILE), new SlotStatus(new SlotID(resourceId, 1), DEFAULT_RESOURCE_PROFILE), new SlotStatus( - SlotID.generateDynamicSlotID(resourceId), + new SlotID(resourceId, 2), resourceProfile, jobId, allocationId))); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java index 72e6717..067f8b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java @@ -158,7 +158,7 @@ public class TaskSlotTableImplTest extends TestLogger { assertThat( taskSlotTable.allocateSlot(-1, jobId2, allocationId, SLOT_TIMEOUT), is(false)); - assertThat(taskSlotTable.isAllocated(-1, jobId1, allocationId), is(true)); + assertThat(taskSlotTable.isAllocated(1, jobId1, allocationId), is(true)); Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId1); @@ -201,7 +201,7 @@ public class TaskSlotTableImplTest extends TestLogger { allocatedSlots = taskSlotTable.getAllocatedSlots(jobId); TaskSlot<TaskSlotPayload> taskSlot2 = allocatedSlots.next(); - assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), is(true)); + assertThat(taskSlotTable.isAllocated(1, jobId, allocationId), is(true)); assertEquals(taskSlot1, taskSlot2); assertThat(allocatedSlots.hasNext(), is(false)); } @@ -239,9 +239,9 @@ public class TaskSlotTableImplTest extends TestLogger { Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId); - assertThat(allocatedSlots.next().getIndex(), is(-1)); + assertThat(allocatedSlots.next().getIndex(), is(2)); assertThat(allocatedSlots.hasNext(), is(false)); - assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), is(true)); + assertThat(taskSlotTable.isAllocated(2, jobId, allocationId), is(true)); } } @@ -262,7 +262,7 @@ public class TaskSlotTableImplTest extends TestLogger { Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = taskSlotTable.getAllocatedSlots(jobId); TaskSlot<TaskSlotPayload> allocatedSlot = allocatedSlots.next(); - assertThat(allocatedSlot.getIndex(), is(-1)); + assertThat(allocatedSlot.getIndex(), is(2)); assertThat(allocatedSlot.getResourceProfile(), is(resourceProfile)); assertThat(allocatedSlots.hasNext(), is(false)); } @@ -305,7 +305,7 @@ public class TaskSlotTableImplTest extends TestLogger { taskSlotTable.allocateSlot(-1, jobId, allocationId3, SLOT_TIMEOUT), is(true)); // index 4 - assertThat(taskSlotTable.freeSlot(allocationId2), is(-1)); + assertThat(taskSlotTable.freeSlot(allocationId2), is(3)); ResourceID resourceId = ResourceID.generate(); SlotReport slotReport = taskSlotTable.createSlotReport(resourceId); @@ -336,7 +336,7 @@ public class TaskSlotTableImplTest extends TestLogger { null)), is( new SlotStatus( - SlotID.generateDynamicSlotID(resourceId), + new SlotID(resourceId, 4), TaskSlotUtils.DEFAULT_RESOURCE_PROFILE, jobId, allocationId3))));
