This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit aa44ffc72a9a3839a1c39d8717c9d0e085c1296c Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Thu May 28 16:54:29 2020 +0200 [FLINK-18012] Deactivate slot timeout when calling TaskSlotTable.tryMarkSlotActive In order to avoid timing out activated slots, we also need to deactivate the slot timeout in case that TaskSlotTable.tryMarkSlotActive is being called. This can happen if the response for JobMasterGateway.offerSlots has been too late and timed out. This closes #12391. --- .../runtime/taskexecutor/slot/TaskSlotTable.java | 24 ++++++----- .../taskexecutor/slot/TaskSlotTableTest.java | 48 ++++++++++++++++++++++ 2 files changed, 62 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index 2c361a1..ce418e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -237,18 +237,22 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { TaskSlot taskSlot = getTaskSlot(allocationId); if (taskSlot != null) { - if (taskSlot.markActive()) { - // unregister a potential timeout - LOG.info("Activate slot {}.", allocationId); + return markExistingSlotActive(taskSlot); + } else { + throw new SlotNotFoundException(allocationId); + } + } - timerService.unregisterTimeout(allocationId); + private boolean markExistingSlotActive(TaskSlot taskSlot) { + if (taskSlot.markActive()) { + // unregister a potential timeout + LOG.info("Activate slot {}.", taskSlot.getAllocationId()); - return true; - } else { - return false; - } + timerService.unregisterTimeout(taskSlot.getAllocationId()); + + return true; } else { - throw new SlotNotFoundException(allocationId); + return false; } } @@ -394,7 +398,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { TaskSlot taskSlot = getTaskSlot(allocationId); if (taskSlot != null && taskSlot.isAllocated(jobId, allocationId)) { - return taskSlot.markActive(); + return markExistingSlotActive(taskSlot); } else { return false; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java index b6235b8..a8eb1ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableTest.java @@ -23,7 +23,9 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.TriFunction; import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; @@ -38,10 +40,13 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; /** * Tests for the {@link TaskSlotTable}. @@ -133,6 +138,49 @@ public class TaskSlotTableTest extends TestLogger { } } + @Test + public void testMarkSlotActiveDeactivatesSlotTimeout() throws Exception { + runDeactivateSlotTimeoutTest((taskSlotTable, jobId, allocationId) -> { + try { + return taskSlotTable.markSlotActive(allocationId); + } catch (SlotNotFoundException e) { + ExceptionUtils.rethrow(e); + return false; + } + }); + } + + @Test + public void testTryMarkSlotActiveDeactivatesSlotTimeout() throws Exception { + runDeactivateSlotTimeoutTest(TaskSlotTable::tryMarkSlotActive); + } + + private void runDeactivateSlotTimeoutTest(TriFunction<TaskSlotTable, JobID, AllocationID, Boolean> taskSlotTableAction) throws Exception { + final CompletableFuture<AllocationID> timeoutFuture = new CompletableFuture<>(); + final TestingSlotActions testingSlotActions = new TestingSlotActionsBuilder() + .setTimeoutSlotConsumer((allocationID, uuid) -> timeoutFuture.complete(allocationID)) + .build(); + + final TaskSlotTable taskSlotTable = createTaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN)); + + try { + taskSlotTable.start(testingSlotActions); + + final AllocationID allocationId = new AllocationID(); + final long timeout = 50L; + final JobID jobId = new JobID(); + assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId, Time.milliseconds(timeout)), is(true)); + assertThat(taskSlotTableAction.apply(taskSlotTable, jobId, allocationId), is(true)); + + try { + timeoutFuture.get(timeout, TimeUnit.MILLISECONDS); + fail("The slot timeout should have been deactivated."); + } catch (TimeoutException expected) {} + } finally { + taskSlotTable.stop(); + } + } + @Nonnull private TaskSlotTable createTaskSlotTable(final Collection<ResourceProfile> resourceProfiles) { return new TaskSlotTable(