[FLINK-7871] [flip6] SlotPool should release unused slots to RM This closes #5048.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0587bf83 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0587bf83 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0587bf83 Branch: refs/heads/master Commit: 0587bf830bd854537b678b3aac4187da44c1a64e Parents: 02ea508 Author: ifndef-SleePy <mmyy1...@gmail.com> Authored: Sat Feb 10 00:40:13 2018 +0800 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Feb 12 11:59:30 2018 +0100 ---------------------------------------------------------------------- .../flink/configuration/JobManagerOptions.java | 20 ++++++ .../flink/runtime/jobmaster/JobMaster.java | 12 +++- .../runtime/jobmaster/slotpool/SlotPool.java | 68 ++++++++++++++++++-- .../jobmaster/slotpool/SlotPoolRpcTest.java | 13 +++- .../jobmaster/slotpool/SlotPoolTest.java | 51 +++++++++++++++ 5 files changed, 155 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0587bf83/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 9f61736..a9f3673 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -113,6 +113,26 @@ public class JobManagerOptions { .defaultValue(60L * 60L) .withDescription("The time in seconds after which a completed job expires and is purged from the job store."); + public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT = + key("slot.request.timeout") + .defaultValue(10 * 60 * 1000L) + .withDescription("The timeout in milliseconds for requesting a slot from Slot Pool."); + + public static final ConfigOption<Long> SLOT_REQUEST_RM_TIMEOUT = + key("slot.request.resourcemanager.timeout") + .defaultValue(10 * 1000L) + .withDescription("The timeout in milliseconds for sending a request to Resource Manager."); + + public static final ConfigOption<Long> SLOT_ALLOCATION_RM_TIMEOUT = + key("slot.allocation.resourcemanager.timeout") + .defaultValue(5 * 60 * 1000L) + .withDescription("The timeout in milliseconds for allocation a slot from Resource Manager."); + + public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT = + key("slot.idle.timeout") + .defaultValue(5 * 60 * 1000L) + .withDescription("The timeout in milliseconds for a idle slot in Slot Pool."); + // --------------------------------------------------------------------------------------------- private JobManagerOptions() { http://git-wip-us.apache.org/repos/asf/flink/blob/0587bf83/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 7d845aa..a8f1154 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.metrics.MetricGroup; @@ -97,6 +98,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.util.clock.SystemClock; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -277,7 +279,15 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever(); - this.slotPool = new SlotPool(rpcService, jobGraph.getJobID()); + this.slotPool = new SlotPool( + rpcService, + jobGraph.getJobID(), + SystemClock.getInstance(), + Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT)), + Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_ALLOCATION_RM_TIMEOUT)), + Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_RM_TIMEOUT)), + Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT))); + this.slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); this.executionGraph = ExecutionGraphBuilder.buildGraph( http://git-wip-us.apache.org/repos/asf/flink/blob/0587bf83/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index a56335c..ca492b5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -53,6 +53,7 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -88,12 +89,16 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS // ------------------------------------------------------------------------ - private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = Time.minutes(5); + private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = Time.minutes(10); - private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = Time.minutes(10); + private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = Time.minutes(5); private static final Time DEFAULT_TIMEOUT = Time.seconds(10); + private static final Time DEFAULT_IDLE_SLOT_TIMEOUT = Time.minutes(5); + + private static final int NUM_RELEASE_SLOT_TRIES = 3; + // ------------------------------------------------------------------------ private final JobID jobId; @@ -121,6 +126,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS /** Timeout for allocation round trips (RM -> launch TM -> offer slot). */ private final Time resourceManagerAllocationTimeout; + /** Timeout for releasing idle slots. */ + private final Time idleSlotTimeout; + private final Clock clock; /** Managers for the different slot sharing groups. */ @@ -136,14 +144,16 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS // ------------------------------------------------------------------------ - public SlotPool(RpcService rpcService, JobID jobId) { + @VisibleForTesting + protected SlotPool(RpcService rpcService, JobID jobId) { this( rpcService, jobId, SystemClock.getInstance(), DEFAULT_SLOT_REQUEST_TIMEOUT, DEFAULT_RM_ALLOCATION_TIMEOUT, - DEFAULT_TIMEOUT); + DEFAULT_TIMEOUT, + DEFAULT_IDLE_SLOT_TIMEOUT); } public SlotPool( @@ -152,13 +162,15 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS Clock clock, Time slotRequestTimeout, Time resourceManagerAllocationTimeout, - Time resourceManagerRequestTimeout) { + Time resourceManagerRequestTimeout, + Time idleSlotTimeout) { super(rpcService); this.jobId = checkNotNull(jobId); this.clock = checkNotNull(clock); this.timeout = checkNotNull(resourceManagerRequestTimeout); + this.idleSlotTimeout = checkNotNull(idleSlotTimeout); this.resourceManagerAllocationTimeout = checkNotNull(resourceManagerAllocationTimeout); this.registeredTaskManagers = new HashSet<>(16); @@ -201,6 +213,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS } catch (Exception e) { throw new RuntimeException("This should never happen", e); } + + scheduleRunAsync(() -> checkIdleSlot(), idleSlotTimeout); } @Override @@ -267,6 +281,50 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS return providerAndOwner; } + /** + * Check the available slots, release the slot that is idle for a long time. + */ + protected void checkIdleSlot() { + + // The timestamp in SlotAndTimestamp is relative + final long currentRelativeTimeMillis = clock.relativeTimeMillis(); + + final List<AllocatedSlot> expiredSlots = new ArrayList<>(); + + availableSlots.availableSlots.forEach((allocationID, slotAndTimestamp) -> { + + if (slotAndTimestamp != null && + currentRelativeTimeMillis - slotAndTimestamp.timestamp() > idleSlotTimeout.toMilliseconds()) { + + expiredSlots.add(slotAndTimestamp.slot()); + + } + }); + + for (AllocatedSlot expiredSlot : expiredSlots) { + final AllocationID allocationID = expiredSlot.getAllocationId(); + if (availableSlots.tryRemove(allocationID)) { + + log.info("Releasing idle slot {}.", allocationID); + + final CompletableFuture<Acknowledge> future = FutureUtils.retry( + () -> expiredSlot.getTaskManagerGateway().freeSlot( + allocationID, + new FlinkException("Releasing idle slot " + allocationID), + timeout), + NUM_RELEASE_SLOT_TRIES, + getMainThreadExecutor()); + + future.exceptionally(throwable -> { + log.warn("Releasing idle slot {} failed.", allocationID, throwable); + return null; + }); + } + } + + scheduleRunAsync(() -> checkIdleSlot(), idleSlotTimeout); + } + // ------------------------------------------------------------------------ // Resource Manager Connection // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/0587bf83/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java index e60a3d6..d8e4487 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java @@ -112,7 +112,8 @@ public class SlotPoolRpcTest extends TestLogger { SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), - Time.milliseconds(10L) // this is the timeout for the request tested here + Time.milliseconds(10L), // this is the timeout for the request tested here + TestingUtils.infiniteTime() ); try { @@ -147,6 +148,7 @@ public class SlotPoolRpcTest extends TestLogger { SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); try { @@ -189,6 +191,7 @@ public class SlotPoolRpcTest extends TestLogger { SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); try { @@ -236,6 +239,7 @@ public class SlotPoolRpcTest extends TestLogger { SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); try { @@ -305,6 +309,7 @@ public class SlotPoolRpcTest extends TestLogger { SystemClock.getInstance(), Time.milliseconds(10L), TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); final CompletableFuture<SlotRequestId> releaseSlotFuture = new CompletableFuture<>(); @@ -354,14 +359,16 @@ public class SlotPoolRpcTest extends TestLogger { Clock clock, Time slotRequestTimeout, Time resourceManagerAllocationTimeout, - Time resourceManagerRequestTimeout) { + Time resourceManagerRequestTimeout, + Time idleSlotTimeout) { super( rpcService, jobId, clock, slotRequestTimeout, resourceManagerAllocationTimeout, - resourceManagerRequestTimeout); + resourceManagerRequestTimeout, + idleSlotTimeout); releaseSlotConsumer = null; } http://git-wip-us.apache.org/repos/asf/flink/blob/0587bf83/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java index ef79be1..5c0d661 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java @@ -40,6 +40,8 @@ import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.util.clock.SystemClock; import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -591,6 +593,46 @@ public class SlotPoolTest extends TestLogger { } } + @Test + public void testCheckIdleSlot() throws Exception { + final SlotPool slotPool = new SlotPool( + rpcService, + jobId, + SystemClock.getInstance(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + timeout); + + try { + final List<AllocationID> freedSlots = new ArrayList<>(); + taskManagerGateway.setFreeSlotConsumer((tuple) -> { + freedSlots.add(tuple.f0); + }); + + final AllocationID expiredSlotID = new AllocationID(); + final AllocationID freshSlotID = new AllocationID(); + + slotPool.getAvailableSlots().add(createSlot(expiredSlotID), + SystemClock.getInstance().relativeTimeMillis() - timeout.toMilliseconds() - 1); + + // Add a 10 s floating buffer time. + final long floatingTimeBuffer = 10000; + slotPool.getAvailableSlots().add(createSlot(freshSlotID), + SystemClock.getInstance().relativeTimeMillis() - timeout.toMilliseconds() + floatingTimeBuffer); + + slotPool.checkIdleSlot(); + + assertEquals(1, freedSlots.size()); + assertEquals(expiredSlotID, freedSlots.get(0)); + assertFalse(slotPool.getAvailableSlots().contains(expiredSlotID)); + assertTrue(slotPool.getAvailableSlots().contains(freshSlotID)); + + } finally { + RpcUtils.terminateRpcEndpoint(slotPool, timeout); + } + } + private static SlotPoolGateway setupSlotPool( SlotPool slotPool, ResourceManagerGateway resourceManagerGateway) throws Exception { @@ -602,4 +644,13 @@ public class SlotPoolTest extends TestLogger { return slotPool.getSelfGateway(SlotPoolGateway.class); } + + private AllocatedSlot createSlot(final AllocationID allocationId) { + return new AllocatedSlot( + allocationId, + taskManagerLocation, + 0, + ResourceProfile.UNKNOWN, + taskManagerGateway); + } }