[FLINK-5093] Add proper shutdown of scheduled executor service in TimerService


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4afcc4ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4afcc4ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4afcc4ab

Branch: refs/heads/flip-6
Commit: 4afcc4abda1848f6beaeaccc190d078f1775fd12
Parents: 029db00
Author: Till Rohrmann <[email protected]>
Authored: Mon Nov 28 15:25:57 2016 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Mon Nov 28 17:12:57 2016 +0100

----------------------------------------------------------------------
 .../taskexecutor/TaskManagerServices.java       |  4 ++-
 .../TaskManagerServicesConfiguration.java       | 20 ++++++++++++--
 .../runtime/taskexecutor/slot/TimerService.java | 29 +++++++++++++++++---
 .../taskexecutor/TaskExecutorITCase.java        |  2 +-
 .../taskexecutor/slot/TimerServiceTest.java     |  2 +-
 5 files changed, 48 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4afcc4ab/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 7966078..d53c328 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -175,7 +175,9 @@ public class TaskManagerServices {
                        resourceProfiles.add(new ResourceProfile(1.0, 42L));
                }
 
-               final TimerService<AllocationID> timerService = new 
TimerService<>(new ScheduledThreadPoolExecutor(1));
+               final TimerService<AllocationID> timerService = new 
TimerService<>(
+                       new ScheduledThreadPoolExecutor(1),
+                       
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout());
 
                final TaskSlotTable taskSlotTable = new 
TaskSlotTable(resourceProfiles, timerService);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4afcc4ab/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 702142f..ff80bca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.HybridMemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -35,6 +36,7 @@ import java.io.File;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -57,6 +59,8 @@ public class TaskManagerServicesConfiguration {
 
        private final float memoryFraction;
 
+       private final long timerServiceShutdownTimeout;
+
        public TaskManagerServicesConfiguration(
                InetAddress taskManagerAddress,
                String[] tmpDirPaths,
@@ -64,7 +68,8 @@ public class TaskManagerServicesConfiguration {
                int numberOfSlots,
                long configuredMemory,
                boolean preAllocateMemory,
-               float memoryFraction) {
+               float memoryFraction,
+               long timerServiceShutdownTimeout) {
 
                this.taskManagerAddress = checkNotNull(taskManagerAddress);
                this.tmpDirPaths = checkNotNull(tmpDirPaths);
@@ -74,6 +79,10 @@ public class TaskManagerServicesConfiguration {
                this.configuredMemory = configuredMemory;
                this.preAllocateMemory = preAllocateMemory;
                this.memoryFraction = memoryFraction;
+
+               checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
+                       "service shutdown timeout must be greater or equal to 
0.");
+               this.timerServiceShutdownTimeout = timerServiceShutdownTimeout;
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -107,6 +116,10 @@ public class TaskManagerServicesConfiguration {
                return preAllocateMemory;
        }
 
+       public long getTimerServiceShutdownTimeout() {
+               return timerServiceShutdownTimeout;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Parsing of Flink configuration
        // 
--------------------------------------------------------------------------------------------
@@ -161,6 +174,8 @@ public class TaskManagerServicesConfiguration {
                        ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
                        "MemoryManager fraction of the free memory must be 
between 0.0 and 1.0");
 
+               long timerServiceShutdownTimeout = 
AkkaUtils.getTimeout(configuration).toMillis();
+
                return new TaskManagerServicesConfiguration(
                        remoteAddress,
                        tmpDirs,
@@ -168,7 +183,8 @@ public class TaskManagerServicesConfiguration {
                        slots,
                        configuredMemory,
                        preAllocateMemory,
-                       memoryFraction);
+                       memoryFraction,
+                       timerServiceShutdownTimeout);
        }
 
        // 
--------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4afcc4ab/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
index 14c9ab1..8ec9a2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.taskexecutor.slot;
 
 import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -35,18 +37,28 @@ import java.util.concurrent.TimeUnit;
  */
 public class TimerService<K> {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(TimerService.class);
+
        /** Executor service for the scheduled timeouts */
        private final ScheduledExecutorService scheduledExecutorService;
 
+       /** Timeout for the shutdown of the service. */
+       private final long shutdownTimeout;
+
        /** Map of currently active timeouts */
        private final Map<K, Timeout<K>> timeouts;
 
        /** Listener which is notified about occurring timeouts */
        private TimeoutListener<K> timeoutListener;
 
-       public TimerService(final ScheduledExecutorService 
scheduledExecutorService) {
+       public TimerService(
+                       final ScheduledExecutorService scheduledExecutorService,
+                       final long shutdownTimeout) {
                this.scheduledExecutorService = 
Preconditions.checkNotNull(scheduledExecutorService);
 
+               Preconditions.checkArgument(shutdownTimeout >= 0L, "The shut 
down timeout must be larger than or equal than 0.");
+               this.shutdownTimeout = shutdownTimeout;
+
                this.timeouts = new HashMap<>(16);
                this.timeoutListener = null;
        }
@@ -65,6 +77,17 @@ public class TimerService<K> {
                timeoutListener = null;
 
                scheduledExecutorService.shutdown();
+
+               try {
+                       
if(!scheduledExecutorService.awaitTermination(shutdownTimeout, 
TimeUnit.MILLISECONDS)) {
+                               LOG.debug("The scheduled executor service did 
not properly terminate. Shutting " +
+                                       "it down now.");
+                               scheduledExecutorService.shutdownNow();
+                       }
+               } catch (InterruptedException e) {
+                       LOG.debug("Could not properly await the termination of 
the scheduled executor service.", e);
+                       scheduledExecutorService.shutdownNow();
+               }
        }
 
        /**
@@ -103,9 +126,7 @@ public class TimerService<K> {
         */
        protected void unregisterAllTimeouts() {
                for (Timeout<K> timeout : timeouts.values()) {
-                       if (timeout != null) {
-                               timeout.cancel();
-                       }
+                       timeout.cancel();
                }
                timeouts.clear();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4afcc4ab/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 050db44..36fd65b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -106,7 +106,7 @@ public class TaskExecutorITCase {
                final TaskManagerMetricGroup taskManagerMetricGroup = 
mock(TaskManagerMetricGroup.class);
                final BroadcastVariableManager broadcastVariableManager = 
mock(BroadcastVariableManager.class);
                final FileCache fileCache = mock(FileCache.class);
-               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(resourceProfile), new 
TimerService<AllocationID>(scheduledExecutorService));
+               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(resourceProfile), new 
TimerService<AllocationID>(scheduledExecutorService, 100L));
                final JobManagerTable jobManagerTable = new JobManagerTable();
                final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4afcc4ab/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
index 9dd5f39..cad3624 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TimerServiceTest.java
@@ -48,7 +48,7 @@ public class TimerServiceTest {
                ScheduledFuture scheduledFuture = mock(ScheduledFuture.class);
                when(scheduledExecutorService.schedule(any(Runnable.class), 
anyLong(), any(TimeUnit.class)))
                        .thenReturn(scheduledFuture);
-               TimerService<AllocationID> timerService = new 
TimerService<>(scheduledExecutorService);
+               TimerService<AllocationID> timerService = new 
TimerService<>(scheduledExecutorService, 100L);
                TimeoutListener<AllocationID> listener = 
mock(TimeoutListener.class);
 
                timerService.start(listener);

Reply via email to