This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aa44ffc72a9a3839a1c39d8717c9d0e085c1296c
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Thu May 28 16:54:29 2020 +0200

    [FLINK-18012] Deactivate slot timeout when calling 
TaskSlotTable.tryMarkSlotActive
    
    In order to avoid timing out activated slots, we also need to deactivate 
the slot timeout
    in case that TaskSlotTable.tryMarkSlotActive is being called. This can 
happen if the response
    for JobMasterGateway.offerSlots has been too late and timed out.
    
    This closes #12391.
---
 .../runtime/taskexecutor/slot/TaskSlotTable.java   | 24 ++++++-----
 .../taskexecutor/slot/TaskSlotTableTest.java       | 48 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 2c361a1..ce418e0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -237,18 +237,22 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
                TaskSlot taskSlot = getTaskSlot(allocationId);
 
                if (taskSlot != null) {
-                       if (taskSlot.markActive()) {
-                               // unregister a potential timeout
-                               LOG.info("Activate slot {}.", allocationId);
+                       return markExistingSlotActive(taskSlot);
+               } else {
+                       throw new SlotNotFoundException(allocationId);
+               }
+       }
 
-                               timerService.unregisterTimeout(allocationId);
+       private boolean markExistingSlotActive(TaskSlot taskSlot) {
+               if (taskSlot.markActive()) {
+                       // unregister a potential timeout
+                       LOG.info("Activate slot {}.", 
taskSlot.getAllocationId());
 
-                               return true;
-                       } else {
-                               return false;
-                       }
+                       
timerService.unregisterTimeout(taskSlot.getAllocationId());
+
+                       return true;
                } else {
-                       throw new SlotNotFoundException(allocationId);
+                       return false;
                }
        }
 
@@ -394,7 +398,7 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
                TaskSlot taskSlot = getTaskSlot(allocationId);
 
                if (taskSlot != null && taskSlot.isAllocated(jobId, 
allocationId)) {
-                       return taskSlot.markActive();
+                       return markExistingSlotActive(taskSlot);
                } else {
                        return false;
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
index b6235b8..a8eb1ee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java
@@ -23,7 +23,9 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.TriFunction;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 
@@ -38,10 +40,13 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link TaskSlotTable}.
@@ -133,6 +138,49 @@ public class TaskSlotTableTest extends TestLogger {
                }
        }
 
+       @Test
+       public void testMarkSlotActiveDeactivatesSlotTimeout() throws Exception 
{
+               runDeactivateSlotTimeoutTest((taskSlotTable, jobId, 
allocationId) -> {
+                       try {
+                               return 
taskSlotTable.markSlotActive(allocationId);
+                       } catch (SlotNotFoundException e) {
+                               ExceptionUtils.rethrow(e);
+                               return false;
+                       }
+               });
+       }
+
+       @Test
+       public void testTryMarkSlotActiveDeactivatesSlotTimeout() throws 
Exception {
+               runDeactivateSlotTimeoutTest(TaskSlotTable::tryMarkSlotActive);
+       }
+
+       private void runDeactivateSlotTimeoutTest(TriFunction<TaskSlotTable, 
JobID, AllocationID, Boolean> taskSlotTableAction) throws Exception {
+               final CompletableFuture<AllocationID> timeoutFuture = new 
CompletableFuture<>();
+               final TestingSlotActions testingSlotActions = new 
TestingSlotActionsBuilder()
+                       .setTimeoutSlotConsumer((allocationID, uuid) -> 
timeoutFuture.complete(allocationID))
+                       .build();
+
+               final TaskSlotTable taskSlotTable = 
createTaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN));
+
+               try {
+                       taskSlotTable.start(testingSlotActions);
+
+                       final AllocationID allocationId = new AllocationID();
+                       final long timeout = 50L;
+                       final JobID jobId = new JobID();
+                       assertThat(taskSlotTable.allocateSlot(0, jobId, 
allocationId, Time.milliseconds(timeout)), is(true));
+                       assertThat(taskSlotTableAction.apply(taskSlotTable, 
jobId, allocationId), is(true));
+
+                       try {
+                               timeoutFuture.get(timeout, 
TimeUnit.MILLISECONDS);
+                               fail("The slot timeout should have been 
deactivated.");
+                       } catch (TimeoutException expected) {}
+               } finally {
+                       taskSlotTable.stop();
+               }
+       }
+
        @Nonnull
        private TaskSlotTable createTaskSlotTable(final 
Collection<ResourceProfile> resourceProfiles) {
                return new TaskSlotTable(

Reply via email to