Repository: flink Updated Branches: refs/heads/master babee2772 -> b7f0f5f4d
[FLINK-7666] Close TimeService after closing operators. This was revealed through the continuous file reader. Previously the StreamTask was calling the quiesceAndAwaitPending() of the TimerService before the close() of the operator. This meant that with a periodic watermark emitter and a small file (e.g. one split), the timer service would be closed before even starting to read (as soon as the reader received the first split), and no timers would be registered to emit watermarks. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7f0f5f4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7f0f5f4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7f0f5f4 Branch: refs/heads/master Commit: b7f0f5f4d1dc49cafd50c7c72148580fe6947ef5 Parents: babee27 Author: kkloudas <[email protected]> Authored: Thu Oct 19 10:23:39 2017 +0200 Committer: kkloudas <[email protected]> Committed: Thu Oct 26 00:06:08 2017 +0200 ---------------------------------------------------------------------- .../runtime/tasks/ProcessingTimeService.java | 13 +++- .../streaming/runtime/tasks/StreamTask.java | 11 ++- .../tasks/SystemProcessingTimeService.java | 77 ++++++++++++++------ .../tasks/TestProcessingTimeService.java | 7 +- .../runtime/tasks/OneInputStreamTaskTest.java | 47 ++++++++++++ .../tasks/SystemProcessingTimeServiceTest.java | 12 +-- 6 files changed, 132 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b7f0f5f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java index 11074a2..b238252 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java @@ -29,7 +29,7 @@ import java.util.concurrent.ScheduledFuture; * <p>The registration of timers follows a life cycle of three phases: * <ol> * <li>In the initial state, it accepts timer registrations and triggers when the time is reached.</li> - * <li>After calling {@link #quiesceAndAwaitPending()}, further calls to + * <li>After calling {@link #quiesce()}, further calls to * {@link #registerTimer(long, ProcessingTimeCallback)} will not register any further timers, and will * return a "dummy" future as a result. This is used for clean shutdown, where currently firing * timers are waited for and no future timers can be scheduled, without causing hard exceptions.</li> @@ -73,14 +73,19 @@ public abstract class ProcessingTimeService { /** * This method puts the service into a state where it does not register new timers, but * returns for each call to {@link #registerTimer(long, ProcessingTimeCallback)} only a "mock" future. - * Furthermore, the method clears all not yet started timers, and awaits the completion - * of currently executing timers. + * Furthermore, the method clears all not yet started timers. * * <p>This method can be used to cleanly shut down the timer service. The using components * will not notice that the service is shut down (as for example via exceptions when registering * a new timer), but the service will simply not fire any timer any more. */ - public abstract void quiesceAndAwaitPending() throws InterruptedException; + public abstract void quiesce() throws InterruptedException; + + /** + * This method can be used after calling {@link #quiesce()}, and awaits the completion + * of currently executing timers. + */ + public abstract void awaitPendingAfterQuiesce() throws InterruptedException; /** * Shuts down and clean up the timer service provider hard and immediately. This does not wait http://git-wip-us.apache.org/repos/asf/flink/blob/b7f0f5f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 631cdfc..68f590e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -194,6 +195,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners} * and {@link org.apache.flink.streaming.api.windowing.triggers.Trigger Triggers}. * */ + @VisibleForTesting public void setProcessingTimeService(ProcessingTimeService timeProvider) { if (timeProvider == null) { throw new RuntimeException("The timeProvider cannot be set to null."); @@ -266,9 +268,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> throw new CancelTaskException(); } - // make sure all timers finish and no new timers can come - timerService.quiesceAndAwaitPending(); - LOG.debug("Finished task {}", getName()); // make sure no further checkpoint and notification actions happen. @@ -280,11 +279,17 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // this is part of the main logic, so if this fails, the task is considered failed closeAllOperators(); + // make sure no new timers can come + timerService.quiesce(); + // only set the StreamTask to not running after all operators have been closed! // See FLINK-7430 isRunning = false; } + // make sure all timers finish + timerService.awaitPendingAfterQuiesce(); + LOG.debug("Closed operators for task {}", getName()); // make sure all buffered data is flushed http://git-wip-us.apache.org/repos/asf/flink/blob/b7f0f5f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java index d00c1b9..71bfdf6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java @@ -108,7 +108,7 @@ public class SystemProcessingTimeService extends ProcessingTimeService { // that way we save unnecessary volatile accesses for each timer try { return timerService.schedule( - new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS); + new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { final int status = this.status.get(); @@ -133,7 +133,7 @@ public class SystemProcessingTimeService extends ProcessingTimeService { // that way we save unnecessary volatile accesses for each timer try { return timerService.scheduleAtFixedRate( - new RepeatedTriggerTask(task, checkpointLock, callback, nextTimestamp, period), + new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period), initialDelay, period, TimeUnit.MILLISECONDS); @@ -152,18 +152,34 @@ public class SystemProcessingTimeService extends ProcessingTimeService { } } + /** + * @return {@code true} is the status of the service + * is {@link #STATUS_ALIVE}, {@code false} otherwise. + */ + @VisibleForTesting + boolean isAlive() { + return status.get() == STATUS_ALIVE; + } + @Override public boolean isTerminated() { return status.get() == STATUS_SHUTDOWN; } @Override - public void quiesceAndAwaitPending() throws InterruptedException { + public void quiesce() throws InterruptedException { if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) { timerService.shutdown(); + } + } + + @Override + public void awaitPendingAfterQuiesce() throws InterruptedException { + if (!timerService.isTerminated()) { + Preconditions.checkState(timerService.isTerminating() || timerService.isShutdown()); // await forever (almost) - timerService.awaitTermination(365, TimeUnit.DAYS); + timerService.awaitTermination(365L, TimeUnit.DAYS); } } @@ -199,15 +215,23 @@ public class SystemProcessingTimeService extends ProcessingTimeService { */ private static final class TriggerTask implements Runnable { + private final AtomicInteger serviceStatus; private final Object lock; private final ProcessingTimeCallback target; private final long timestamp; private final AsyncExceptionHandler exceptionHandler; - TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, ProcessingTimeCallback target, long timestamp) { - this.exceptionHandler = exceptionHandler; - this.lock = lock; - this.target = target; + private TriggerTask( + final AtomicInteger serviceStatus, + final AsyncExceptionHandler exceptionHandler, + final Object lock, + final ProcessingTimeCallback target, + final long timestamp) { + + this.serviceStatus = Preconditions.checkNotNull(serviceStatus); + this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler); + this.lock = Preconditions.checkNotNull(lock); + this.target = Preconditions.checkNotNull(target); this.timestamp = timestamp; } @@ -215,7 +239,9 @@ public class SystemProcessingTimeService extends ProcessingTimeService { public void run() { synchronized (lock) { try { - target.onProcessingTime(timestamp); + if (serviceStatus.get() == STATUS_ALIVE) { + target.onProcessingTime(timestamp); + } } catch (Throwable t) { TimerException asyncException = new TimerException(t); exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException); @@ -228,6 +254,8 @@ public class SystemProcessingTimeService extends ProcessingTimeService { * Internal task which is repeatedly called by the processing time service. */ private static final class RepeatedTriggerTask implements Runnable { + + private final AtomicInteger serviceStatus; private final Object lock; private final ProcessingTimeCallback target; private final long period; @@ -236,11 +264,14 @@ public class SystemProcessingTimeService extends ProcessingTimeService { private long nextTimestamp; private RepeatedTriggerTask( - AsyncExceptionHandler exceptionHandler, - Object lock, - ProcessingTimeCallback target, - long nextTimestamp, - long period) { + final AtomicInteger serviceStatus, + final AsyncExceptionHandler exceptionHandler, + final Object lock, + final ProcessingTimeCallback target, + final long nextTimestamp, + final long period) { + + this.serviceStatus = Preconditions.checkNotNull(serviceStatus); this.lock = Preconditions.checkNotNull(lock); this.target = Preconditions.checkNotNull(target); this.period = period; @@ -251,15 +282,17 @@ public class SystemProcessingTimeService extends ProcessingTimeService { @Override public void run() { - try { - synchronized (lock) { - target.onProcessingTime(nextTimestamp); - } + synchronized (lock) { + try { + if (serviceStatus.get() == STATUS_ALIVE) { + target.onProcessingTime(nextTimestamp); + } - nextTimestamp += period; - } catch (Throwable t) { - TimerException asyncException = new TimerException(t); - exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException); + nextTimestamp += period; + } catch (Throwable t) { + TimerException asyncException = new TimerException(t); + exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b7f0f5f4/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java index a9d5205..080eeb5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -117,7 +117,7 @@ public class TestProcessingTimeService extends ProcessingTimeService { } @Override - public void quiesceAndAwaitPending() { + public void quiesce() { if (!isTerminated) { isQuiesced = true; priorityQueue.clear(); @@ -125,6 +125,11 @@ public class TestProcessingTimeService extends ProcessingTimeService { } @Override + public void awaitPendingAfterQuiesce() throws InterruptedException { + // do nothing. + } + + @Override public void shutdownService() { this.isTerminated = true; } http://git-wip-us.apache.org/repos/asf/flink/blob/b7f0f5f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 394bc35..d60966a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -541,6 +541,53 @@ public class OneInputStreamTaskTest extends TestLogger { TestingStreamOperator.numberRestoreCalls = 0; } + @Test + public void testQuiesceTimerServiceAfterOpClose() throws Exception { + + final OneInputStreamTask<String, String> task = new OneInputStreamTask<>(); + + SystemProcessingTimeService timeService = new SystemProcessingTimeService(task, task.getCheckpointLock()); + task.setProcessingTimeService(timeService); + + // verify that the timer service is running + Assert.assertTrue(((SystemProcessingTimeService) task.getProcessingTimeService()).isAlive()); + + final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>( + task, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + testHarness.setupOutputForSingletonOperatorChain(); + + StreamConfig streamConfig = testHarness.getStreamConfig(); + streamConfig.setStreamOperator(new TestOperator()); + streamConfig.setOperatorID(new OperatorID()); + + testHarness.invoke(); + testHarness.waitForTaskRunning(); + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + timeService.shutdownService(); + } + + private static class TestOperator + extends AbstractStreamOperator<String> + implements OneInputStreamOperator<String, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement(StreamRecord<String> element) throws Exception { + output.collect(element); + } + + @Override + public void close() throws Exception { + + // verify that the timer service is still running + Assert.assertTrue( + ((SystemProcessingTimeService) getContainingTask().getProcessingTimeService()) + .isAlive()); + super.close(); + } + } //============================================================================================== // Utility functions and classes http://git-wip-us.apache.org/repos/asf/flink/blob/b7f0f5f4/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java index 890fc23..4c105d3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -180,7 +180,8 @@ public class SystemProcessingTimeServiceTest extends TestLogger { assertFalse(scheduledFuture.isDone()); // this should cancel our future - timer.quiesceAndAwaitPending(); + timer.quiesce(); + timer.awaitPendingAfterQuiesce(); // it may be that the cancelled status is not immediately visible after the // termination (not necessary a volatile update), so we need to "get()" the cancellation @@ -297,7 +298,7 @@ public class SystemProcessingTimeServiceTest extends TestLogger { final ReentrantLock scopeLock = new ReentrantLock(); - timer.registerTimer(System.currentTimeMillis() + 20, new ProcessingTimeCallback() { + timer.registerTimer(timer.getCurrentProcessingTime() + 20L, new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { scopeLock.lock(); @@ -313,13 +314,14 @@ public class SystemProcessingTimeServiceTest extends TestLogger { // after the task triggered, shut the timer down cleanly, waiting for the task to finish latch.await(); - timer.quiesceAndAwaitPending(); + timer.quiesce(); + timer.awaitPendingAfterQuiesce(); // should be able to immediately acquire the lock, since the task must have exited by now assertTrue(scopeLock.tryLock()); // should be able to schedule more tasks (that never get executed) - ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new ProcessingTimeCallback() { + ScheduledFuture<?> future = timer.registerTimer(timer.getCurrentProcessingTime() - 5L, new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { throw new Exception("test"); @@ -328,7 +330,7 @@ public class SystemProcessingTimeServiceTest extends TestLogger { assertNotNull(future); // nothing should be scheduled right now - assertEquals(0, timer.getNumTasksScheduled()); + assertEquals(0L, timer.getNumTasksScheduled()); // check that no asynchronous error was reported - that ensures that the newly scheduled // triggerable did, in fact, not trigger
