[FLINK-9304] Timer service shutdown should not stop if interrupted

This closes #5962.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4e03689
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4e03689
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4e03689

Branch: refs/heads/master
Commit: f4e03689dd5fef8eafeb0996a31ea021c5ea2203
Parents: d734032
Author: Stefan Richter <[email protected]>
Authored: Mon May 7 11:55:35 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue May 15 00:23:10 2018 +0200

----------------------------------------------------------------------
 .../runtime/tasks/ProcessingTimeService.java    |  11 ++
 .../streaming/runtime/tasks/StreamTask.java     |  44 +++---
 .../tasks/SystemProcessingTimeService.java      |  32 +++++
 .../tasks/TestProcessingTimeService.java        |   6 +
 .../tasks/SystemProcessingTimeServiceTest.java  | 133 ++++++++++++++-----
 5 files changed, 168 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index 2516299..4515ce2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -96,6 +96,17 @@ public abstract class ProcessingTimeService {
        public abstract void shutdownService();
 
        /**
+        * Shuts down and clean up the timer service provider hard and 
immediately. This does not wait
+        * for any timer to complete. Any further call to {@link 
#registerTimer(long, ProcessingTimeCallback)}
+        * will result in a hard exception. This call cannot be interrupted and 
will block until the shutdown is completed
+        * or the timeout is exceeded.
+        *
+        * @param timeoutMs timeout for blocking on the service shutdown in 
milliseconds.
+        * @return returns true iff the shutdown was completed.
+        */
+       public abstract boolean shutdownServiceUninterruptible(long timeoutMs);
+
+       /**
         * Shuts down and clean up the timer service provider hard and 
immediately. This does wait
         * for all timers to complete or until the time limit is exceeded. Any 
call to
         * {@link #registerTimer(long, ProcessingTimeCallback)} will result in 
a hard exception after calling this method.

http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6790949..2cc8886 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -73,7 +73,6 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -348,30 +347,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        // clean up everything we initialized
                        isRunning = false;
 
-                       // clear the interrupted status so that we can wait for 
the following resource shutdowns to complete
-                       Thread.interrupted();
-
                        // stop all timers and threads
-                       if (timerService != null && 
!timerService.isTerminated()) {
-                               try {
-
-                                       final long timeoutMs = 
getEnvironment().getTaskManagerInfo().getConfiguration().
-                                               
getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
-
-                                       // wait for a reasonable time for all 
pending timer threads to finish
-                                       boolean timerShutdownComplete =
-                                               
timerService.shutdownAndAwaitPending(timeoutMs, TimeUnit.MILLISECONDS);
-
-                                       if (!timerShutdownComplete) {
-                                               LOG.warn("Timer service 
shutdown exceeded time limit of {} ms while waiting for pending " +
-                                                       "timers. Will continue 
with shutdown procedure.", timeoutMs);
-                                       }
-                               }
-                               catch (Throwable t) {
-                                       // catch and log the exception to not 
replace the original exception
-                                       LOG.error("Could not shut down timer 
service", t);
-                               }
-                       }
+                       tryShutdownTimerService();
 
                        // stop all asynchronous checkpoint threads
                        try {
@@ -706,6 +683,25 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                }
        }
 
+       private void tryShutdownTimerService() {
+
+               if (timerService != null && !timerService.isTerminated()) {
+
+                       try {
+                               final long timeoutMs = 
getEnvironment().getTaskManagerInfo().getConfiguration().
+                                       
getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
+
+                               if 
(!timerService.shutdownServiceUninterruptible(timeoutMs)) {
+                                       LOG.warn("Timer service shutdown 
exceeded time limit of {} ms while waiting for pending " +
+                                               "timers. Will continue with 
shutdown procedure.", timeoutMs);
+                               }
+                       } catch (Throwable t) {
+                               // catch and log the exception to not replace 
the original exception
+                               LOG.error("Could not shut down timer service", 
t);
+                       }
+               }
+       }
+
        private void checkpointState(
                        CheckpointMetaData checkpointMetaData,
                        CheckpointOptions checkpointOptions,

http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index be8b23c..4e4208f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -18,10 +18,15 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
 
+import java.time.Duration;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.Delayed;
@@ -41,6 +46,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class SystemProcessingTimeService extends ProcessingTimeService {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(SystemProcessingTimeService.class);
+
        private static final int STATUS_ALIVE = 0;
        private static final int STATUS_QUIESCED = 1;
        private static final int STATUS_SHUTDOWN = 2;
@@ -197,6 +204,31 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
                return timerService.awaitTermination(time, timeUnit);
        }
 
+       @Override
+       public boolean shutdownServiceUninterruptible(long timeoutMs) {
+
+               final Deadline deadline = 
Deadline.fromNow(Duration.ofMillis(timeoutMs));
+
+               boolean shutdownComplete = false;
+               boolean receivedInterrupt = false;
+
+               do {
+                       try {
+                               // wait for a reasonable time for all pending 
timer threads to finish
+                               shutdownComplete = 
shutdownAndAwaitPending(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+                       } catch (InterruptedException iex) {
+                               receivedInterrupt = true;
+                               LOG.trace("Intercepted attempt to interrupt 
timer service shutdown.", iex);
+                       }
+               } while (deadline.hasTimeLeft() && !shutdownComplete);
+
+               if (receivedInterrupt) {
+                       Thread.currentThread().interrupt();
+               }
+
+               return shutdownComplete;
+       }
+
        // safety net to destroy the thread pool
        @Override
        protected void finalize() throws Throwable {

http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 2081f19..f4a5f37 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -135,6 +135,12 @@ public class TestProcessingTimeService extends 
ProcessingTimeService {
        }
 
        @Override
+       public boolean shutdownServiceUninterruptible(long timeoutMs) {
+               shutdownService();
+               return true;
+       }
+
+       @Override
        public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) 
throws InterruptedException {
                shutdownService();
                return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 01fd778..cfcaf72 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.core.testutils.OneShotLatch;
 import 
org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
@@ -449,41 +450,11 @@ public class SystemProcessingTimeServiceTest extends 
TestLogger {
        public void testShutdownAndWaitPending() {
 
                final Object lock = new Object();
-               final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
-               final OneShotLatch blockUntilTerminationInterrupts = new 
OneShotLatch();
                final OneShotLatch blockUntilTriggered = new OneShotLatch();
-               final AtomicBoolean check = new AtomicBoolean(true);
-
-               final SystemProcessingTimeService timeService = new 
SystemProcessingTimeService(
-                       (message, exception) -> {
-                       },
-                       lock);
-
-               timeService.scheduleAtFixedRate(
-                       timestamp -> {
-
-                               waitUntilTimerStarted.trigger();
-
-                               try {
-                                       blockUntilTerminationInterrupts.await();
-                                       check.set(false);
-                               } catch (InterruptedException ignore) {
-                               }
-
-                               try {
-                                       blockUntilTriggered.await();
-                               } catch (InterruptedException ignore) {
-                                       check.set(false);
-                               }
-                       },
-                       0L,
-                       10L);
+               final AtomicBoolean timerExecutionFinished = new 
AtomicBoolean(false);
 
-               try {
-                       waitUntilTimerStarted.await();
-               } catch (InterruptedException e) {
-                       Assert.fail();
-               }
+               final SystemProcessingTimeService timeService =
+                       createBlockingSystemProcessingTimeService(lock, 
blockUntilTriggered, timerExecutionFinished);
 
                Assert.assertFalse(timeService.isTerminated());
 
@@ -504,7 +475,101 @@ public class SystemProcessingTimeServiceTest extends 
TestLogger {
                        Assert.fail("Unexpected interruption.");
                }
 
-               Assert.assertTrue(check.get());
+               Assert.assertTrue(timerExecutionFinished.get());
+               Assert.assertTrue(timeService.isTerminated());
+       }
+
+       @Test
+       public void testShutdownServiceUninterruptible() {
+               final Object lock = new Object();
+               final OneShotLatch blockUntilTriggered = new OneShotLatch();
+               final AtomicBoolean timerFinished = new AtomicBoolean(false);
+
+               final SystemProcessingTimeService timeService =
+                       createBlockingSystemProcessingTimeService(lock, 
blockUntilTriggered, timerFinished);
+
+               Assert.assertFalse(timeService.isTerminated());
+
+               final Thread interruptTarget = Thread.currentThread();
+               final AtomicBoolean runInterrupts = new AtomicBoolean(true);
+               final Thread interruptCallerThread = new Thread(() -> {
+                       while (runInterrupts.get()) {
+                               interruptTarget.interrupt();
+                               try {
+                                       Thread.sleep(1);
+                               } catch (InterruptedException ignore) {
+                               }
+                       }
+               });
+
+               interruptCallerThread.start();
+
+               final long timeoutMs = 50L;
+               final long startTime = System.nanoTime();
+               Assert.assertFalse(timeService.isTerminated());
+               // check that termination did not succeed (because of blocking 
timer execution)
+               
Assert.assertFalse(timeService.shutdownServiceUninterruptible(timeoutMs));
+               // check that termination flag was set.
                Assert.assertTrue(timeService.isTerminated());
+               // check that the blocked timer is still in flight.
+               Assert.assertFalse(timerFinished.get());
+               // check that we waited until timeout
+               Assert.assertTrue((System.nanoTime() - startTime) >= 
(1_000_000L * timeoutMs));
+
+               runInterrupts.set(false);
+
+               do {
+                       try {
+                               interruptCallerThread.join();
+                       } catch (InterruptedException ignore) {
+                       }
+               } while (interruptCallerThread.isAlive());
+
+               blockUntilTriggered.trigger();
+               
Assert.assertTrue(timeService.shutdownServiceUninterruptible(timeoutMs));
+               Assert.assertTrue(timerFinished.get());
+       }
+
+       private static SystemProcessingTimeService 
createBlockingSystemProcessingTimeService(
+               final Object lock,
+               final OneShotLatch blockUntilTriggered,
+               final AtomicBoolean check) {
+
+               final OneShotLatch waitUntilTimerStarted = new OneShotLatch();
+
+               Preconditions.checkState(!check.get());
+
+               final SystemProcessingTimeService timeService = new 
SystemProcessingTimeService(
+                       (message, exception) -> {
+                       },
+                       lock);
+
+               timeService.scheduleAtFixedRate(
+                       timestamp -> {
+
+                               waitUntilTimerStarted.trigger();
+
+                               boolean unblocked = false;
+
+                               while (!unblocked) {
+                                       try {
+                                               blockUntilTriggered.await();
+                                               unblocked = true;
+                                       } catch (InterruptedException ignore) {
+                                       }
+                               }
+
+                               check.set(true);
+                       },
+                       0L,
+                       10L);
+
+               try {
+                       waitUntilTimerStarted.await();
+               } catch (InterruptedException e) {
+                       Assert.fail("Problem while starting up service.");
+               }
+
+               return timeService;
        }
 }

Reply via email to