This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6443a501ef26ebc0c30ddb630e75926757a12d3
Author: Yangze Guo <[email protected]>
AuthorDate: Fri Nov 27 14:58:16 2020 +0800

    [FLINK-15660][runtime] request slot should success if the slot is already 
allocated with the same allocation id
---
 .../taskexecutor/slot/TaskSlotTableImpl.java       | 37 ++++++----
 .../taskexecutor/slot/TaskSlotTableImplTest.java   | 78 ++++++++++++++++++++--
 2 files changed, 97 insertions(+), 18 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
index 10ac1c4..9d93d59 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
@@ -288,22 +288,13 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> 
implements TaskSlotTab
 
         TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
         if (taskSlot != null) {
-            LOG.info("Allocation ID {} is already allocated in {}.", 
allocationId, taskSlot);
-            return false;
-        }
-
-        if (taskSlots.containsKey(index)) {
-            TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
+            return isDuplicatedSlot(taskSlot, jobId, resourceProfile, index);
+        } else if (isIndexAlreadyTaken(index)) {
             LOG.info(
-                    "Slot with index {} already exist, with resource profile 
{}, job id {} and allocation id {}.",
+                    "The slot with index {} is already assigned to another 
allocation with id {}.",
                     index,
-                    duplicatedTaskSlot.getResourceProfile(),
-                    duplicatedTaskSlot.getJobId(),
-                    duplicatedTaskSlot.getAllocationId());
-            return duplicatedTaskSlot.getJobId().equals(jobId)
-                    && 
duplicatedTaskSlot.getAllocationId().equals(allocationId);
-        } else if (allocatedSlots.containsKey(allocationId)) {
-            return true;
+                    taskSlots.get(index).getAllocationId());
+            return false;
         }
 
         resourceProfile = index >= 0 ? defaultSlotResourceProfile : 
resourceProfile;
@@ -349,6 +340,24 @@ public class TaskSlotTableImpl<T extends TaskSlotPayload> 
implements TaskSlotTab
         return true;
     }
 
+    private boolean isDuplicatedSlot(
+            TaskSlot taskSlot, JobID jobId, ResourceProfile resourceProfile, 
int index) {
+        LOG.info(
+                "Slot with allocationId {} already exist, with resource 
profile {}, job id {} and index {}. The required index is {}.",
+                taskSlot.getAllocationId(),
+                taskSlot.getResourceProfile(),
+                taskSlot.getJobId(),
+                taskSlot.getIndex(),
+                index);
+        return taskSlot.getJobId().equals(jobId)
+                && taskSlot.getResourceProfile().equals(resourceProfile)
+                && (index < 0 || taskSlot.getIndex() == index);
+    }
+
+    private boolean isIndexAlreadyTaken(int index) {
+        return taskSlots.get(index) != null;
+    }
+
     @Override
     public boolean markSlotActive(AllocationID allocationId) throws 
SlotNotFoundException {
         checkRunning();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
index de02023..72e6717 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.TimeoutException;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -115,17 +116,65 @@ public class TaskSlotTableImplTest extends TestLogger {
     }
 
     /**
-     * Tests that redundant slot allocation with the same AllocationID to a 
different slot is
-     * rejected.
+     * Tests that inconsistent static slot allocation with the same 
AllocationID to a different slot
+     * is rejected.
      */
     @Test
-    public void testRedundantSlotAllocation() throws Exception {
+    public void testInconsistentStaticSlotAllocation() throws Exception {
         try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = 
createTaskSlotTableAndStart(2)) {
             final JobID jobId = new JobID();
+            final AllocationID allocationId1 = new AllocationID();
+            final AllocationID allocationId2 = new AllocationID();
+
+            assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId1, 
SLOT_TIMEOUT), is(true));
+            assertThat(
+                    taskSlotTable.allocateSlot(1, jobId, allocationId1, 
SLOT_TIMEOUT), is(false));
+            assertThat(
+                    taskSlotTable.allocateSlot(0, jobId, allocationId2, 
SLOT_TIMEOUT), is(false));
+
+            assertThat(taskSlotTable.isAllocated(0, jobId, allocationId1), 
is(true));
+            assertThat(taskSlotTable.isSlotFree(1), is(true));
+
+            Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
+                    taskSlotTable.getAllocatedSlots(jobId);
+            assertThat(allocatedSlots.next().getIndex(), is(0));
+            assertThat(allocatedSlots.hasNext(), is(false));
+        }
+    }
+
+    /**
+     * Tests that inconsistent dynamic slot allocation with the same 
AllocationID to a different
+     * slot is rejected.
+     */
+    @Test
+    public void testInconsistentDynamicSlotAllocation() throws Exception {
+        try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = 
createTaskSlotTableAndStart(1)) {
+            final JobID jobId1 = new JobID();
+            final JobID jobId2 = new JobID();
             final AllocationID allocationId = new AllocationID();
 
+            assertThat(
+                    taskSlotTable.allocateSlot(-1, jobId1, allocationId, 
SLOT_TIMEOUT), is(true));
+            assertThat(
+                    taskSlotTable.allocateSlot(-1, jobId2, allocationId, 
SLOT_TIMEOUT), is(false));
+
+            assertThat(taskSlotTable.isAllocated(-1, jobId1, allocationId), 
is(true));
+
+            Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
+                    taskSlotTable.getAllocatedSlots(jobId1);
+            assertThat(allocatedSlots.next().getAllocationId(), 
is(allocationId));
+            assertThat(allocatedSlots.hasNext(), is(false));
+        }
+    }
+
+    @Test
+    public void testDuplicateStaticSlotAllocation() throws Exception {
+        try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = 
createTaskSlotTableAndStart(2)) {
+            final JobID jobId = new JobID();
+            final AllocationID allocationId = new AllocationID();
+
+            assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId, 
SLOT_TIMEOUT), is(true));
             assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId, 
SLOT_TIMEOUT), is(true));
-            assertThat(taskSlotTable.allocateSlot(1, jobId, allocationId, 
SLOT_TIMEOUT), is(false));
 
             assertThat(taskSlotTable.isAllocated(0, jobId, allocationId), 
is(true));
             assertThat(taskSlotTable.isSlotFree(1), is(true));
@@ -138,6 +187,27 @@ public class TaskSlotTableImplTest extends TestLogger {
     }
 
     @Test
+    public void testDuplicateDynamicSlotAllocation() throws Exception {
+        try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = 
createTaskSlotTableAndStart(1)) {
+            final JobID jobId = new JobID();
+            final AllocationID allocationId = new AllocationID();
+
+            assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId, 
SLOT_TIMEOUT), is(true));
+            Iterator<TaskSlot<TaskSlotPayload>> allocatedSlots =
+                    taskSlotTable.getAllocatedSlots(jobId);
+            TaskSlot<TaskSlotPayload> taskSlot1 = allocatedSlots.next();
+
+            assertThat(taskSlotTable.allocateSlot(-1, jobId, allocationId, 
SLOT_TIMEOUT), is(true));
+            allocatedSlots = taskSlotTable.getAllocatedSlots(jobId);
+            TaskSlot<TaskSlotPayload> taskSlot2 = allocatedSlots.next();
+
+            assertThat(taskSlotTable.isAllocated(-1, jobId, allocationId), 
is(true));
+            assertEquals(taskSlot1, taskSlot2);
+            assertThat(allocatedSlots.hasNext(), is(false));
+        }
+    }
+
+    @Test
     public void testFreeSlot() throws Exception {
         try (final TaskSlotTable<TaskSlotPayload> taskSlotTable = 
createTaskSlotTableAndStart(2)) {
             final JobID jobId = new JobID();

Reply via email to