[FLINK-5093] Add proper shutdown of scheduled executor service in TimerService
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d3a3eeb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d3a3eeb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d3a3eeb Branch: refs/heads/master Commit: 4d3a3eeb7b2cfc2ead2a388f745a05184c7878ce Parents: eefcbbd Author: Till Rohrmann <[email protected]> Authored: Mon Nov 28 15:25:57 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Dec 23 20:54:26 2016 +0100 ---------------------------------------------------------------------- .../taskexecutor/TaskManagerServices.java | 4 ++- .../TaskManagerServicesConfiguration.java | 20 ++++++++++++-- .../runtime/taskexecutor/slot/TimerService.java | 29 +++++++++++++++++--- .../taskexecutor/TaskExecutorITCase.java | 2 +- .../taskexecutor/slot/TimerServiceTest.java | 2 +- 5 files changed, 47 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4d3a3eeb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index b57fafe..ae5a383 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -204,7 +204,9 @@ public class TaskManagerServices { resourceProfiles.add(new ResourceProfile(1.0, 42L)); } - final TimerService<AllocationID> timerService = new TimerService<>(new ScheduledThreadPoolExecutor(1)); + final TimerService<AllocationID> timerService = new TimerService<>( + new ScheduledThreadPoolExecutor(1), + taskManagerServicesConfiguration.getTimerServiceShutdownTimeout()); final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService); http://git-wip-us.apache.org/repos/asf/flink/blob/4d3a3eeb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java index 3190a93..2c76372 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -26,6 +26,7 @@ import org.apache.flink.core.memory.HeapMemorySegment; import org.apache.flink.core.memory.HybridMemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.memory.MemoryManager; @@ -37,6 +38,7 @@ import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -63,6 +65,8 @@ public class TaskManagerServicesConfiguration { private final MetricRegistryConfiguration metricRegistryConfiguration; + private final long timerServiceShutdownTimeout; + public TaskManagerServicesConfiguration( InetAddress taskManagerAddress, String[] tmpDirPaths, @@ -72,7 +76,8 @@ public class TaskManagerServicesConfiguration { long configuredMemory, boolean preAllocateMemory, float memoryFraction, - MetricRegistryConfiguration metricRegistryConfiguration) { + MetricRegistryConfiguration metricRegistryConfiguration, + long timerServiceShutdownTimeout) { this.taskManagerAddress = checkNotNull(taskManagerAddress); this.tmpDirPaths = checkNotNull(tmpDirPaths); @@ -85,6 +90,10 @@ public class TaskManagerServicesConfiguration { this.memoryFraction = memoryFraction; this.metricRegistryConfiguration = checkNotNull(metricRegistryConfiguration); + + checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " + + "service shutdown timeout must be greater or equal to 0."); + this.timerServiceShutdownTimeout = timerServiceShutdownTimeout; } // -------------------------------------------------------------------------------------------- @@ -128,6 +137,10 @@ public class TaskManagerServicesConfiguration { return metricRegistryConfiguration; } + public long getTimerServiceShutdownTimeout() { + return timerServiceShutdownTimeout; + } + // -------------------------------------------------------------------------------------------- // Parsing of Flink configuration // -------------------------------------------------------------------------------------------- @@ -188,7 +201,7 @@ public class TaskManagerServicesConfiguration { final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration); - + long timerServiceShutdownTimeout = AkkaUtils.getTimeout(configuration).toMillis(); return new TaskManagerServicesConfiguration( remoteAddress, @@ -199,7 +212,8 @@ public class TaskManagerServicesConfiguration { configuredMemory, preAllocateMemory, memoryFraction, - metricRegistryConfiguration); + metricRegistryConfiguration, + timerServiceShutdownTimeout); } // -------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4d3a3eeb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java index 14c9ab1..8ec9a2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.taskexecutor.slot; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -35,18 +37,28 @@ import java.util.concurrent.TimeUnit; */ public class TimerService<K> { + private static final Logger LOG = LoggerFactory.getLogger(TimerService.class); + /** Executor service for the scheduled timeouts */ private final ScheduledExecutorService scheduledExecutorService; + /** Timeout for the shutdown of the service. */ + private final long shutdownTimeout; + /** Map of currently active timeouts */ private final Map<K, Timeout<K>> timeouts; /** Listener which is notified about occurring timeouts */ private TimeoutListener<K> timeoutListener; - public TimerService(final ScheduledExecutorService scheduledExecutorService) { + public TimerService( + final ScheduledExecutorService scheduledExecutorService, + final long shutdownTimeout) { this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService); + Preconditions.checkArgument(shutdownTimeout >= 0L, "The shut down timeout must be larger than or equal than 0."); + this.shutdownTimeout = shutdownTimeout; + this.timeouts = new HashMap<>(16); this.timeoutListener = null; } @@ -65,6 +77,17 @@ public class TimerService<K> { timeoutListener = null; scheduledExecutorService.shutdown(); + + try { + if(!scheduledExecutorService.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { + LOG.debug("The scheduled executor service did not properly terminate. Shutting " + + "it down now."); + scheduledExecutorService.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.debug("Could not properly await the termination of the scheduled executor service.", e); + scheduledExecutorService.shutdownNow(); + } } /** @@ -103,9 +126,7 @@ public class TimerService<K> { */ protected void unregisterAllTimeouts() { for (Timeout<K> timeout : timeouts.values()) { - if (timeout != null) { - timeout.cancel(); - } + timeout.cancel(); } timeouts.clear(); } http://git-wip-us.apache.org/repos/asf/flink/blob/4d3a3eeb/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 050db44..36fd65b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -106,7 +106,7 @@ public class TaskExecutorITCase { final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class); final BroadcastVariableManager broadcastVariableManager = mock(BroadcastVariableManager.class); final FileCache fileCache = mock(FileCache.class); - final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(resourceProfile), new TimerService<AllocationID>(scheduledExecutorService)); + final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(resourceProfile), new TimerService<AllocationID>(scheduledExecutorService, 100L)); final JobManagerTable jobManagerTable = new JobManagerTable(); final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); http://git-wip-us.apache.org/repos/asf/flink/blob/4d3a3eeb/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java index 9dd5f39..cad3624 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java @@ -48,7 +48,7 @@ public class TimerServiceTest { ScheduledFuture scheduledFuture = mock(ScheduledFuture.class); when(scheduledExecutorService.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))) .thenReturn(scheduledFuture); - TimerService<AllocationID> timerService = new TimerService<>(scheduledExecutorService); + TimerService<AllocationID> timerService = new TimerService<>(scheduledExecutorService, 100L); TimeoutListener<AllocationID> listener = mock(TimeoutListener.class); timerService.start(listener);
