[FLINK-7871] [flip6] SlotPool should release unused slots to RM

This closes #5048.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0587bf83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0587bf83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0587bf83

Branch: refs/heads/master
Commit: 0587bf830bd854537b678b3aac4187da44c1a64e
Parents: 02ea508
Author: ifndef-SleePy <mmyy1...@gmail.com>
Authored: Sat Feb 10 00:40:13 2018 +0800
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Feb 12 11:59:30 2018 +0100

----------------------------------------------------------------------
 .../flink/configuration/JobManagerOptions.java  | 20 ++++++
 .../flink/runtime/jobmaster/JobMaster.java      | 12 +++-
 .../runtime/jobmaster/slotpool/SlotPool.java    | 68 ++++++++++++++++++--
 .../jobmaster/slotpool/SlotPoolRpcTest.java     | 13 +++-
 .../jobmaster/slotpool/SlotPoolTest.java        | 51 +++++++++++++++
 5 files changed, 155 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0587bf83/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 9f61736..a9f3673 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -113,6 +113,26 @@ public class JobManagerOptions {
                .defaultValue(60L * 60L)
                .withDescription("The time in seconds after which a completed 
job expires and is purged from the job store.");
 
+       public static final ConfigOption<Long> SLOT_REQUEST_TIMEOUT =
+               key("slot.request.timeout")
+               .defaultValue(10 * 60 * 1000L)
+               .withDescription("The timeout in milliseconds for requesting a 
slot from Slot Pool.");
+
+       public static final ConfigOption<Long> SLOT_REQUEST_RM_TIMEOUT =
+               key("slot.request.resourcemanager.timeout")
+                       .defaultValue(10 * 1000L)
+                       .withDescription("The timeout in milliseconds for 
sending a request to Resource Manager.");
+
+       public static final ConfigOption<Long> SLOT_ALLOCATION_RM_TIMEOUT =
+               key("slot.allocation.resourcemanager.timeout")
+                       .defaultValue(5 * 60 * 1000L)
+                       .withDescription("The timeout in milliseconds for 
allocation a slot from Resource Manager.");
+
+       public static final ConfigOption<Long> SLOT_IDLE_TIMEOUT =
+               key("slot.idle.timeout")
+                       .defaultValue(5 * 60 * 1000L)
+                       .withDescription("The timeout in milliseconds for a 
idle slot in Slot Pool.");
+
        // 
---------------------------------------------------------------------------------------------
 
        private JobManagerOptions() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0587bf83/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7d845aa..a8f1154 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
@@ -97,6 +98,7 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.clock.SystemClock;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -277,7 +279,15 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                resourceManagerLeaderRetriever = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
 
-               this.slotPool = new SlotPool(rpcService, jobGraph.getJobID());
+               this.slotPool = new SlotPool(
+                       rpcService,
+                       jobGraph.getJobID(),
+                       SystemClock.getInstance(),
+                       
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT)),
+                       
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_ALLOCATION_RM_TIMEOUT)),
+                       
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_RM_TIMEOUT)),
+                       
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT)));
+
                this.slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
 
                this.executionGraph = ExecutionGraphBuilder.buildGraph(

http://git-wip-us.apache.org/repos/asf/flink/blob/0587bf83/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index a56335c..ca492b5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -53,6 +53,7 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -88,12 +89,16 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
 
        // 
------------------------------------------------------------------------
 
-       private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = 
Time.minutes(5);
+       private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = 
Time.minutes(10);
 
-       private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = 
Time.minutes(10);
+       private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = 
Time.minutes(5);
 
        private static final Time DEFAULT_TIMEOUT = Time.seconds(10);
 
+       private static final Time DEFAULT_IDLE_SLOT_TIMEOUT = Time.minutes(5);
+
+       private static final int NUM_RELEASE_SLOT_TRIES = 3;
+
        // 
------------------------------------------------------------------------
 
        private final JobID jobId;
@@ -121,6 +126,9 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
        /** Timeout for allocation round trips (RM -> launch TM -> offer slot). 
*/
        private final Time resourceManagerAllocationTimeout;
 
+       /** Timeout for releasing idle slots. */
+       private final Time idleSlotTimeout;
+
        private final Clock clock;
 
        /** Managers for the different slot sharing groups. */
@@ -136,14 +144,16 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
 
        // 
------------------------------------------------------------------------
 
-       public SlotPool(RpcService rpcService, JobID jobId) {
+       @VisibleForTesting
+       protected SlotPool(RpcService rpcService, JobID jobId) {
                this(
                        rpcService,
                        jobId,
                        SystemClock.getInstance(),
                        DEFAULT_SLOT_REQUEST_TIMEOUT,
                        DEFAULT_RM_ALLOCATION_TIMEOUT,
-                       DEFAULT_TIMEOUT);
+                       DEFAULT_TIMEOUT,
+                       DEFAULT_IDLE_SLOT_TIMEOUT);
        }
 
        public SlotPool(
@@ -152,13 +162,15 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        Clock clock,
                        Time slotRequestTimeout,
                        Time resourceManagerAllocationTimeout,
-                       Time resourceManagerRequestTimeout) {
+                       Time resourceManagerRequestTimeout,
+                       Time idleSlotTimeout) {
 
                super(rpcService);
 
                this.jobId = checkNotNull(jobId);
                this.clock = checkNotNull(clock);
                this.timeout = checkNotNull(resourceManagerRequestTimeout);
+               this.idleSlotTimeout = checkNotNull(idleSlotTimeout);
                this.resourceManagerAllocationTimeout = 
checkNotNull(resourceManagerAllocationTimeout);
 
                this.registeredTaskManagers = new HashSet<>(16);
@@ -201,6 +213,8 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                } catch (Exception e) {
                        throw new RuntimeException("This should never happen", 
e);
                }
+
+               scheduleRunAsync(() -> checkIdleSlot(), idleSlotTimeout);
        }
 
        @Override
@@ -267,6 +281,50 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                return providerAndOwner;
        }
 
+       /**
+        * Check the available slots, release the slot that is idle for a long 
time.
+        */
+       protected void checkIdleSlot() {
+
+               // The timestamp in SlotAndTimestamp is relative
+               final long currentRelativeTimeMillis = 
clock.relativeTimeMillis();
+
+               final List<AllocatedSlot> expiredSlots = new ArrayList<>();
+
+               availableSlots.availableSlots.forEach((allocationID, 
slotAndTimestamp) -> {
+
+                       if (slotAndTimestamp != null &&
+                               currentRelativeTimeMillis - 
slotAndTimestamp.timestamp() > idleSlotTimeout.toMilliseconds()) {
+
+                               expiredSlots.add(slotAndTimestamp.slot());
+
+                       }
+               });
+
+               for (AllocatedSlot expiredSlot : expiredSlots) {
+                       final AllocationID allocationID = 
expiredSlot.getAllocationId();
+                       if (availableSlots.tryRemove(allocationID)) {
+
+                               log.info("Releasing idle slot {}.", 
allocationID);
+
+                               final CompletableFuture<Acknowledge> future = 
FutureUtils.retry(
+                                       () -> 
expiredSlot.getTaskManagerGateway().freeSlot(
+                                               allocationID,
+                                               new FlinkException("Releasing 
idle slot " + allocationID),
+                                               timeout),
+                                       NUM_RELEASE_SLOT_TRIES,
+                                       getMainThreadExecutor());
+
+                               future.exceptionally(throwable -> {
+                                       log.warn("Releasing idle slot {} 
failed.", allocationID, throwable);
+                                       return null;
+                               });
+                       }
+               }
+
+               scheduleRunAsync(() -> checkIdleSlot(), idleSlotTimeout);
+       }
+
        // 
------------------------------------------------------------------------
        //  Resource Manager Connection
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0587bf83/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
index e60a3d6..d8e4487 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
@@ -112,7 +112,8 @@ public class SlotPoolRpcTest extends TestLogger {
                        SystemClock.getInstance(),
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime(),
-                       Time.milliseconds(10L) // this is the timeout for the 
request tested here
+                       Time.milliseconds(10L), // this is the timeout for the 
request tested here
+                       TestingUtils.infiniteTime()
                );
 
                try {
@@ -147,6 +148,7 @@ public class SlotPoolRpcTest extends TestLogger {
                        SystemClock.getInstance(),
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime());
 
                try {
@@ -189,6 +191,7 @@ public class SlotPoolRpcTest extends TestLogger {
                        SystemClock.getInstance(),
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime());
 
                try {
@@ -236,6 +239,7 @@ public class SlotPoolRpcTest extends TestLogger {
                        SystemClock.getInstance(),
                        TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime());
 
                try {
@@ -305,6 +309,7 @@ public class SlotPoolRpcTest extends TestLogger {
                        SystemClock.getInstance(),
                        Time.milliseconds(10L),
                        TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime());
 
                final CompletableFuture<SlotRequestId> releaseSlotFuture = new 
CompletableFuture<>();
@@ -354,14 +359,16 @@ public class SlotPoolRpcTest extends TestLogger {
                                Clock clock,
                                Time slotRequestTimeout,
                                Time resourceManagerAllocationTimeout,
-                               Time resourceManagerRequestTimeout) {
+                               Time resourceManagerRequestTimeout,
+                               Time idleSlotTimeout) {
                        super(
                                rpcService,
                                jobId,
                                clock,
                                slotRequestTimeout,
                                resourceManagerAllocationTimeout,
-                               resourceManagerRequestTimeout);
+                               resourceManagerRequestTimeout,
+                               idleSlotTimeout);
 
                        releaseSlotConsumer = null;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/0587bf83/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index ef79be1..5c0d661 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -40,6 +40,8 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.clock.SystemClock;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -591,6 +593,46 @@ public class SlotPoolTest extends TestLogger {
                }
        }
 
+       @Test
+       public void testCheckIdleSlot() throws Exception {
+               final SlotPool slotPool = new SlotPool(
+                       rpcService,
+                       jobId,
+                       SystemClock.getInstance(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
+                       timeout);
+
+               try {
+                       final List<AllocationID> freedSlots = new ArrayList<>();
+                       taskManagerGateway.setFreeSlotConsumer((tuple) -> {
+                               freedSlots.add(tuple.f0);
+                       });
+
+                       final AllocationID expiredSlotID = new AllocationID();
+                       final AllocationID freshSlotID = new AllocationID();
+
+                       
slotPool.getAvailableSlots().add(createSlot(expiredSlotID),
+                               SystemClock.getInstance().relativeTimeMillis() 
- timeout.toMilliseconds() - 1);
+
+                       // Add a 10 s floating buffer time.
+                       final long floatingTimeBuffer = 10000;
+                       
slotPool.getAvailableSlots().add(createSlot(freshSlotID),
+                               SystemClock.getInstance().relativeTimeMillis() 
- timeout.toMilliseconds() + floatingTimeBuffer);
+
+                       slotPool.checkIdleSlot();
+
+                       assertEquals(1, freedSlots.size());
+                       assertEquals(expiredSlotID, freedSlots.get(0));
+                       
assertFalse(slotPool.getAvailableSlots().contains(expiredSlotID));
+                       
assertTrue(slotPool.getAvailableSlots().contains(freshSlotID));
+
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(slotPool, timeout);
+               }
+       }
+
        private static SlotPoolGateway setupSlotPool(
                        SlotPool slotPool,
                        ResourceManagerGateway resourceManagerGateway) throws 
Exception {
@@ -602,4 +644,13 @@ public class SlotPoolTest extends TestLogger {
 
                return slotPool.getSelfGateway(SlotPoolGateway.class);
        }
+
+       private AllocatedSlot createSlot(final AllocationID allocationId) {
+               return new AllocatedSlot(
+                       allocationId,
+                       taskManagerLocation,
+                       0,
+                       ResourceProfile.UNKNOWN,
+                       taskManagerGateway);
+       }
 }

Reply via email to