[FLINK-9304] Timer service shutdown should not stop if interrupted This closes #5962.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4e03689 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4e03689 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4e03689 Branch: refs/heads/master Commit: f4e03689dd5fef8eafeb0996a31ea021c5ea2203 Parents: d734032 Author: Stefan Richter <[email protected]> Authored: Mon May 7 11:55:35 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Tue May 15 00:23:10 2018 +0200 ---------------------------------------------------------------------- .../runtime/tasks/ProcessingTimeService.java | 11 ++ .../streaming/runtime/tasks/StreamTask.java | 44 +++--- .../tasks/SystemProcessingTimeService.java | 32 +++++ .../tasks/TestProcessingTimeService.java | 6 + .../tasks/SystemProcessingTimeServiceTest.java | 133 ++++++++++++++----- 5 files changed, 168 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java index 2516299..4515ce2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java @@ -96,6 +96,17 @@ public abstract class ProcessingTimeService { public abstract void shutdownService(); /** + * Shuts down and clean up the timer service provider hard and immediately. This does not wait + * for any timer to complete. Any further call to {@link #registerTimer(long, ProcessingTimeCallback)} + * will result in a hard exception. This call cannot be interrupted and will block until the shutdown is completed + * or the timeout is exceeded. + * + * @param timeoutMs timeout for blocking on the service shutdown in milliseconds. + * @return returns true iff the shutdown was completed. + */ + public abstract boolean shutdownServiceUninterruptible(long timeoutMs); + + /** * Shuts down and clean up the timer service provider hard and immediately. This does wait * for all timers to complete or until the time limit is exceeded. Any call to * {@link #registerTimer(long, ProcessingTimeCallback)} will result in a hard exception after calling this method. http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 6790949..2cc8886 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -73,7 +73,6 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -348,30 +347,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // clean up everything we initialized isRunning = false; - // clear the interrupted status so that we can wait for the following resource shutdowns to complete - Thread.interrupted(); - // stop all timers and threads - if (timerService != null && !timerService.isTerminated()) { - try { - - final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration(). - getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS); - - // wait for a reasonable time for all pending timer threads to finish - boolean timerShutdownComplete = - timerService.shutdownAndAwaitPending(timeoutMs, TimeUnit.MILLISECONDS); - - if (!timerShutdownComplete) { - LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " + - "timers. Will continue with shutdown procedure.", timeoutMs); - } - } - catch (Throwable t) { - // catch and log the exception to not replace the original exception - LOG.error("Could not shut down timer service", t); - } - } + tryShutdownTimerService(); // stop all asynchronous checkpoint threads try { @@ -706,6 +683,25 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } + private void tryShutdownTimerService() { + + if (timerService != null && !timerService.isTerminated()) { + + try { + final long timeoutMs = getEnvironment().getTaskManagerInfo().getConfiguration(). + getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS); + + if (!timerService.shutdownServiceUninterruptible(timeoutMs)) { + LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending " + + "timers. Will continue with shutdown procedure.", timeoutMs); + } + } catch (Throwable t) { + // catch and log the exception to not replace the original exception + LOG.error("Could not shut down timer service", t); + } + } + } + private void checkpointState( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java index be8b23c..4e4208f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java @@ -18,10 +18,15 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nonnull; +import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.Delayed; @@ -41,6 +46,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class SystemProcessingTimeService extends ProcessingTimeService { + private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class); + private static final int STATUS_ALIVE = 0; private static final int STATUS_QUIESCED = 1; private static final int STATUS_SHUTDOWN = 2; @@ -197,6 +204,31 @@ public class SystemProcessingTimeService extends ProcessingTimeService { return timerService.awaitTermination(time, timeUnit); } + @Override + public boolean shutdownServiceUninterruptible(long timeoutMs) { + + final Deadline deadline = Deadline.fromNow(Duration.ofMillis(timeoutMs)); + + boolean shutdownComplete = false; + boolean receivedInterrupt = false; + + do { + try { + // wait for a reasonable time for all pending timer threads to finish + shutdownComplete = shutdownAndAwaitPending(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException iex) { + receivedInterrupt = true; + LOG.trace("Intercepted attempt to interrupt timer service shutdown.", iex); + } + } while (deadline.hasTimeLeft() && !shutdownComplete); + + if (receivedInterrupt) { + Thread.currentThread().interrupt(); + } + + return shutdownComplete; + } + // safety net to destroy the thread pool @Override protected void finalize() throws Throwable { http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java index 2081f19..f4a5f37 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -135,6 +135,12 @@ public class TestProcessingTimeService extends ProcessingTimeService { } @Override + public boolean shutdownServiceUninterruptible(long timeoutMs) { + shutdownService(); + return true; + } + + @Override public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException { shutdownService(); return true; http://git-wip-us.apache.org/repos/asf/flink/blob/f4e03689/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java index 01fd778..cfcaf72 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -449,41 +450,11 @@ public class SystemProcessingTimeServiceTest extends TestLogger { public void testShutdownAndWaitPending() { final Object lock = new Object(); - final OneShotLatch waitUntilTimerStarted = new OneShotLatch(); - final OneShotLatch blockUntilTerminationInterrupts = new OneShotLatch(); final OneShotLatch blockUntilTriggered = new OneShotLatch(); - final AtomicBoolean check = new AtomicBoolean(true); - - final SystemProcessingTimeService timeService = new SystemProcessingTimeService( - (message, exception) -> { - }, - lock); - - timeService.scheduleAtFixedRate( - timestamp -> { - - waitUntilTimerStarted.trigger(); - - try { - blockUntilTerminationInterrupts.await(); - check.set(false); - } catch (InterruptedException ignore) { - } - - try { - blockUntilTriggered.await(); - } catch (InterruptedException ignore) { - check.set(false); - } - }, - 0L, - 10L); + final AtomicBoolean timerExecutionFinished = new AtomicBoolean(false); - try { - waitUntilTimerStarted.await(); - } catch (InterruptedException e) { - Assert.fail(); - } + final SystemProcessingTimeService timeService = + createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerExecutionFinished); Assert.assertFalse(timeService.isTerminated()); @@ -504,7 +475,101 @@ public class SystemProcessingTimeServiceTest extends TestLogger { Assert.fail("Unexpected interruption."); } - Assert.assertTrue(check.get()); + Assert.assertTrue(timerExecutionFinished.get()); + Assert.assertTrue(timeService.isTerminated()); + } + + @Test + public void testShutdownServiceUninterruptible() { + final Object lock = new Object(); + final OneShotLatch blockUntilTriggered = new OneShotLatch(); + final AtomicBoolean timerFinished = new AtomicBoolean(false); + + final SystemProcessingTimeService timeService = + createBlockingSystemProcessingTimeService(lock, blockUntilTriggered, timerFinished); + + Assert.assertFalse(timeService.isTerminated()); + + final Thread interruptTarget = Thread.currentThread(); + final AtomicBoolean runInterrupts = new AtomicBoolean(true); + final Thread interruptCallerThread = new Thread(() -> { + while (runInterrupts.get()) { + interruptTarget.interrupt(); + try { + Thread.sleep(1); + } catch (InterruptedException ignore) { + } + } + }); + + interruptCallerThread.start(); + + final long timeoutMs = 50L; + final long startTime = System.nanoTime(); + Assert.assertFalse(timeService.isTerminated()); + // check that termination did not succeed (because of blocking timer execution) + Assert.assertFalse(timeService.shutdownServiceUninterruptible(timeoutMs)); + // check that termination flag was set. Assert.assertTrue(timeService.isTerminated()); + // check that the blocked timer is still in flight. + Assert.assertFalse(timerFinished.get()); + // check that we waited until timeout + Assert.assertTrue((System.nanoTime() - startTime) >= (1_000_000L * timeoutMs)); + + runInterrupts.set(false); + + do { + try { + interruptCallerThread.join(); + } catch (InterruptedException ignore) { + } + } while (interruptCallerThread.isAlive()); + + blockUntilTriggered.trigger(); + Assert.assertTrue(timeService.shutdownServiceUninterruptible(timeoutMs)); + Assert.assertTrue(timerFinished.get()); + } + + private static SystemProcessingTimeService createBlockingSystemProcessingTimeService( + final Object lock, + final OneShotLatch blockUntilTriggered, + final AtomicBoolean check) { + + final OneShotLatch waitUntilTimerStarted = new OneShotLatch(); + + Preconditions.checkState(!check.get()); + + final SystemProcessingTimeService timeService = new SystemProcessingTimeService( + (message, exception) -> { + }, + lock); + + timeService.scheduleAtFixedRate( + timestamp -> { + + waitUntilTimerStarted.trigger(); + + boolean unblocked = false; + + while (!unblocked) { + try { + blockUntilTriggered.await(); + unblocked = true; + } catch (InterruptedException ignore) { + } + } + + check.set(true); + }, + 0L, + 10L); + + try { + waitUntilTimerStarted.await(); + } catch (InterruptedException e) { + Assert.fail("Problem while starting up service."); + } + + return timeService; } }
