This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c6443a501ef26ebc0c30ddb630e75926757a12d3 Author: Yangze Guo <[email protected]> AuthorDate: Fri Nov 27 14:58:16 2020 +0800 [FLINK-15660][runtime] request slot should success if the slot is already allocated with the same allocation id --- .../taskexecutor/slot/TaskSlotTableImpl.java | 37 ++++++---- .../taskexecutor/slot/TaskSlotTableImplTest.java | 78 ++++++++++++++++++++-- 2 files changed, 97 insertions(+), 18 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java index 10ac1c4..9d93d59 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java @@ -288,22 +288,13 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab TaskSlot<T> taskSlot = allocatedSlots.get(allocationId); if (taskSlot != null) { - LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot); - return false; - } - - if (taskSlots.containsKey(index)) { - TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index); + return isDuplicatedSlot(taskSlot, jobId, resourceProfile, index); + } else if (isIndexAlreadyTaken(index)) { LOG.info( - "Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.", + "The slot with index {} is already assigned to another allocation with id {}.", index, - duplicatedTaskSlot.getResourceProfile(), - duplicatedTaskSlot.getJobId(), - duplicatedTaskSlot.getAllocationId()); - return duplicatedTaskSlot.getJobId().equals(jobId) - && duplicatedTaskSlot.getAllocationId().equals(allocationId); - } else if (allocatedSlots.containsKey(allocationId)) { - return true; + taskSlots.get(index).getAllocationId()); + return false; } resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile; @@ -349,6 +340,24 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> implements TaskSlotTab return true; } + private boolean isDuplicatedSlot( + TaskSlot taskSlot, JobID jobId, ResourceProfile resourceProfile, int index) { + LOG.info( + "Slot with allocationId {} already exist, with resource profile {}, job id {} and index {}. The required index is {}.", + taskSlot.getAllocationId(), + taskSlot.getResourceProfile(), + taskSlot.getJobId(), + taskSlot.getIndex(), + index); + return taskSlot.getJobId().equals(jobId) + && taskSlot.getResourceProfile().equals(resourceProfile) + && (index < 0 || taskSlot.getIndex() == index); + } + + private boolean isIndexAlreadyTaken(int index) { + return taskSlots.get(index) != null; + } + @Override public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException { checkRunning(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java index de02023..72e6717 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java @@ -47,6 +47,7 @@ import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -115,17 +116,65 @@ public class TaskSlotTableImplTest extends TestLogger { } /** - * Tests that redundant slot allocation with the same AllocationID to a different slot is - * rejected. + * Tests that inconsistent static slot allocation with the same AllocationID to a different slot + * is rejected. */ @Test - public void testRedundantSlotAllocation() throws Exception { + public void testInconsistentStaticSlotAllocation() throws Exception { try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(2)) { final JobID jobId = new JobID(); + final AllocationID allocationId1 = new AllocationID(); + final AllocationID allocationId2 = new AllocationID(); + + assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId1, SLOT_TIMEOUT), is(true)); + assertThat( + taskSlotTable.allocateSlot(1, jobId, allocationId1, SLOT_TIMEOUT), is(false)); + assertThat( + taskSlotTable.allocateSlot(0, jobId, allocationId2, SLOT_TIMEOUT), is(false)); + + assertThat(taskSlotTable.isAllocated(0, jobId, allocationId1), is(true)); + assertThat(taskSlotTable.isSlotFree(1), is(true)); + + Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = + taskSlotTable.getAllocatedSlots(jobId); + assertThat(allocatedSlots.next().getIndex(), is(0)); + assertThat(allocatedSlots.hasNext(), is(false)); + } + } + + /** + * Tests that inconsistent dynamic slot allocation with the same AllocationID to a different + * slot is rejected. + */ + @Test + public void testInconsistentDynamicSlotAllocation() throws Exception { + try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(1)) { + final JobID jobId1 = new JobID(); + final JobID jobId2 = new JobID(); final AllocationID allocationId = new AllocationID(); + assertThat( + taskSlotTable.allocateSlot(-1, jobId1, allocationId, SLOT_TIMEOUT), is(true)); + assertThat( + taskSlotTable.allocateSlot(-1, jobId2, allocationId, SLOT_TIMEOUT), is(false)); + + assertThat(taskSlotTable.isAllocated(-1, jobId1, allocationId), is(true)); + + Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = + taskSlotTable.getAllocatedSlots(jobId1); + assertThat(allocatedSlots.next().getAllocationId(), is(allocationId)); + assertThat(allocatedSlots.hasNext(), is(false)); + } + } + + @Test + public void testDuplicateStaticSlotAllocation() throws Exception { + try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(2)) { + final JobID jobId = new JobID(); + final AllocationID allocationId = new AllocationID(); + + assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId, SLOT_TIMEOUT), is(true)); assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId, SLOT_TIMEOUT), is(true)); - assertThat(taskSlotTable.allocateSlot(1, jobId, allocationId, SLOT_TIMEOUT), is(false)); assertThat(taskSlotTable.isAllocated(0, jobId, allocationId), is(true)); assertThat(taskSlotTable.isSlotFree(1), is(true)); @@ -138,6 +187,27 @@ public class TaskSlotTableImplTest extends TestLogger { } @Test + public void testDuplicateDynamicSlotAllocation() throws Exception { + try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(1)) { + final JobID jobId = new JobID(); + final AllocationID allocationId = new AllocationID(); + + assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId, SLOT_TIMEOUT), is(true)); + Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots = + taskSlotTable.getAllocatedSlots(jobId); + TaskSlot<TaskSlotPayload> taskSlot1 = allocatedSlots.next(); + + assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId, SLOT_TIMEOUT), is(true)); + allocatedSlots = taskSlotTable.getAllocatedSlots(jobId); + TaskSlot<TaskSlotPayload> taskSlot2 = allocatedSlots.next(); + + assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), is(true)); + assertEquals(taskSlot1, taskSlot2); + assertThat(allocatedSlots.hasNext(), is(false)); + } + } + + @Test public void testFreeSlot() throws Exception { try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = createTaskSlotTableAndStart(2)) { final JobID jobId = new JobID();
