[FLINK-4750] [runtime] Cleanly await end of all currently executing processing time timers when finite streams finish.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8aea8c8f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8aea8c8f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8aea8c8f Branch: refs/heads/master Commit: 8aea8c8f427f5511c6064abbc4b85a3ef106743a Parents: 1cd8d4f Author: Stephan Ewen <[email protected]> Authored: Wed Oct 5 14:33:01 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Oct 5 20:04:34 2016 +0200 ---------------------------------------------------------------------- .../tasks/DefaultTimeServiceProvider.java | 151 +++++++++++++++- .../streaming/runtime/tasks/StreamTask.java | 3 + .../runtime/tasks/TestTimeServiceProvider.java | 55 +++--- .../runtime/tasks/TimeServiceProvider.java | 60 +++++-- .../operators/windowing/NoOpTimerService.java | 7 +- .../tasks/DefaultTimeServiceProviderTest.java | 179 +++++++++++++++++++ 6 files changed, 414 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java index 5664eac..d2c743f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java @@ -17,12 +17,20 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.streaming.runtime.operators.Triggerable; +import javax.annotation.Nonnull; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Delayed; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -32,6 +40,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class DefaultTimeServiceProvider extends TimeServiceProvider { + private static final int STATUS_ALIVE = 0; + private static final int STATUS_QUIESCED = 1; + private static final int STATUS_SHUTDOWN = 2; + + // ------------------------------------------------------------------------ + /** The containing task that owns this time service provider. */ private final AsyncExceptionHandler task; @@ -41,6 +55,8 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { /** The executor service that schedules and calls the triggers of this task*/ private final ScheduledThreadPoolExecutor timerService; + private final AtomicInteger status; + public DefaultTimeServiceProvider(AsyncExceptionHandler failureHandler, Object checkpointLock) { this(failureHandler, checkpointLock, null); @@ -50,19 +66,24 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { AsyncExceptionHandler task, Object checkpointLock, ThreadFactory threadFactory) { - + this.task = checkNotNull(task); this.checkpointLock = checkNotNull(checkpointLock); + this.status = new AtomicInteger(STATUS_ALIVE); + if (threadFactory == null) { this.timerService = new ScheduledThreadPoolExecutor(1); } else { this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory); } - // allow trigger tasks to be removed if all timers for - // that timestamp are removed by user + // tasks should be removed if the future is canceled this.timerService.setRemoveOnCancelPolicy(true); + + // make sure shutdown removes all pending tasks + this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); } @Override @@ -73,17 +94,50 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { @Override public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) { long delay = Math.max(timestamp - getCurrentProcessingTime(), 0); - return timerService.schedule(new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS); + + // we directly try to register the timer and only react to the status on exception + // that way we save unnecessary volatile accesses for each timer + try { + return timerService.schedule( + new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS); + } + catch (RejectedExecutionException e) { + final int status = this.status.get(); + if (status == STATUS_QUIESCED) { + return new NeverCompleteFuture(delay); + } + else if (status == STATUS_SHUTDOWN) { + throw new IllegalStateException("Timer service is shut down"); + } + else { + // something else happened, so propagate the exception + throw e; + } + } } @Override public boolean isTerminated() { - return timerService.isTerminated(); + return status.get() == STATUS_SHUTDOWN; } @Override - public void shutdownService() throws Exception { - timerService.shutdownNow(); + public void quiesceAndAwaitPending() throws InterruptedException { + if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) { + timerService.shutdown(); + + // await forever (almost) + timerService.awaitTermination(365, TimeUnit.DAYS); + } + } + + @Override + public void shutdownService() { + if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) || + status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN)) + { + timerService.shutdownNow(); + } } // safety net to destroy the thread pool @@ -93,6 +147,18 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { timerService.shutdownNow(); } + @VisibleForTesting + int getNumTasksScheduled() { + BlockingQueue<?> queue = timerService.getQueue(); + if (queue == null) { + return 0; + } else { + return queue.size(); + } + } + + // ------------------------------------------------------------------------ + /** * Internal task that is invoked by the timer service and triggers the target. */ @@ -122,4 +188,75 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider { } } } + + // ------------------------------------------------------------------------ + + private static final class NeverCompleteFuture implements ScheduledFuture<Object> { + + private final Object lock = new Object(); + + private final long delayMillis; + + private volatile boolean canceled; + + + private NeverCompleteFuture(long delayMillis) { + this.delayMillis = delayMillis; + } + + @Override + public long getDelay(@Nonnull TimeUnit unit) { + return unit.convert(delayMillis, TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(@Nonnull Delayed o) { + long otherMillis = o.getDelay(TimeUnit.MILLISECONDS); + return Long.compare(this.delayMillis, otherMillis); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + synchronized (lock) { + canceled = true; + lock.notifyAll(); + } + return true; + } + + @Override + public boolean isCancelled() { + return canceled; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public Object get() throws InterruptedException { + synchronized (lock) { + while (!canceled) { + lock.wait(); + } + } + throw new CancellationException(); + } + + @Override + public Object get(long timeout, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException { + synchronized (lock) { + while (!canceled) { + unit.timedWait(lock, timeout); + } + + if (canceled) { + throw new CancellationException(); + } else { + throw new TimeoutException(); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 040ec66..ff14249 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -269,6 +269,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> isRunning = true; run(); + // make sure all timers finish and no new timers can come + timerService.quiesceAndAwaitPending(); + LOG.debug("Finished task {}", getName()); // make sure no further checkpoint and notification actions happen. http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java index f4bead9..9eb6cd1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java @@ -39,6 +39,7 @@ public class TestTimeServiceProvider extends TimeServiceProvider { private volatile long currentTime = 0; private volatile boolean isTerminated; + private volatile boolean isQuiesced; // sorts the timers by timestamp so that they are processed in the correct order. private final Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>(); @@ -47,25 +48,27 @@ public class TestTimeServiceProvider extends TimeServiceProvider { public void setCurrentTime(long timestamp) throws Exception { this.currentTime = timestamp; - // decide which timers to fire and put them in a list - // we do not fire them here to be able to accommodate timers - // that register other timers. - - Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator(); - List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>(); - while (it.hasNext()) { - Map.Entry<Long, List<Triggerable>> t = it.next(); - if (t.getKey() <= this.currentTime) { - toRun.add(t); - it.remove(); + if (!isQuiesced) { + // decide which timers to fire and put them in a list + // we do not fire them here to be able to accommodate timers + // that register other timers. + + Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator(); + List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>(); + while (it.hasNext()) { + Map.Entry<Long, List<Triggerable>> t = it.next(); + if (t.getKey() <= this.currentTime) { + toRun.add(t); + it.remove(); + } } - } - - // now do the actual firing. - for (Map.Entry<Long, List<Triggerable>> tasks: toRun) { - long now = tasks.getKey(); - for (Triggerable task: tasks.getValue()) { - task.trigger(now); + + // now do the actual firing. + for (Map.Entry<Long, List<Triggerable>> tasks: toRun) { + long now = tasks.getKey(); + for (Triggerable task: tasks.getValue()) { + task.trigger(now); + } } } } @@ -80,6 +83,9 @@ public class TestTimeServiceProvider extends TimeServiceProvider { if (isTerminated) { throw new IllegalStateException("terminated"); } + if (isQuiesced) { + return new DummyFuture(); + } if (timestamp <= currentTime) { try { @@ -88,7 +94,6 @@ public class TestTimeServiceProvider extends TimeServiceProvider { throw new RuntimeException(e); } } - List<Triggerable> tasks = registeredTasks.get(timestamp); if (tasks == null) { tasks = new ArrayList<>(); @@ -105,8 +110,16 @@ public class TestTimeServiceProvider extends TimeServiceProvider { } @Override - public void shutdownService() throws Exception { - isTerminated = true; + public void quiesceAndAwaitPending() { + if (!isTerminated) { + isQuiesced = true; + registeredTasks.clear(); + } + } + + @Override + public void shutdownService() { + this.isTerminated = true; } public int getNumRegisteredTimers() { http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java index 42a4fa4..afa6f35 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java @@ -14,32 +14,70 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.streaming.runtime.operators.Triggerable; + import java.util.concurrent.ScheduledFuture; /** * Defines the current processing time and handles all related actions, * such as register timers for tasks to be executed in the future. + * + * <p>The access to the time via {@link #getCurrentProcessingTime()} is always available, regardless of + * whether the timer service has been shut down. + * + * <p>The registration of timers follows a life cycle of three phases: + * <ol> + * <li>In the initial state, it accepts timer registrations and triggers when the time is reached.</li> + * <li>After calling {@link #quiesceAndAwaitPending()}, further calls to + * {@link #registerTimer(long, Triggerable)} will not register any further timers, and will + * return a "dummy" future as a result. This is used for clean shutdown, where currently firing + * timers are waited for and no future timers can be scheduled, without causing hard exceptions.</li> + * <li>After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long, Triggerable)} + * will result in a hard exception.</li> + * </ol> */ public abstract class TimeServiceProvider { - /** Returns the current processing time. */ + /** + * Returns the current processing time. + */ public abstract long getCurrentProcessingTime(); - /** Registers a task to be executed when (processing) time is {@code timestamp}. - * @param timestamp - * when the task is to be executed (in processing time) - * @param target - * the task to be executed - * @return the result to be returned. + /** + * Registers a task to be executed when (processing) time is {@code timestamp}. + * + * @param timestamp Time when the task is to be executed (in processing time) + * @param target The task to be executed + * + * @return The future that represents the scheduled task. This always returns some future, + * even if the timer was shut down */ - public abstract ScheduledFuture<?> registerTimer(final long timestamp, final Triggerable target); + public abstract ScheduledFuture<?> registerTimer(long timestamp, Triggerable target); - /** Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise. */ + /** + * Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise. + */ public abstract boolean isTerminated(); - /** Shuts down and clean up the timer service provider. */ - public abstract void shutdownService() throws Exception; + /** + * This method puts the service into a state where it does not register new timers, but + * returns for each call to {@link #registerTimer(long, Triggerable)} only a "mock" future. + * Furthermore, the method clears all not yet started timers, and awaits the completion + * of currently executing timers. + * + * <p>This method can be used to cleanly shut down the timer service. The using components + * will not notice that the service is shut down (as for example via exceptions when registering + * a new timer), but the service will simply not fire any timer any more. + */ + public abstract void quiesceAndAwaitPending() throws InterruptedException; + + /** + * Shuts down and clean up the timer service provider hard and immediately. This does not wait + * for any timer to complete. Any further call to {@link #registerTimer(long, Triggerable)} + * will result in a hard exception. + */ + public abstract void shutdownService(); } http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java index 16e658e..d0c5050 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java @@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledFuture; class NoOpTimerService extends TimeServiceProvider { private volatile boolean terminated; - + @Override public long getCurrentProcessingTime() { return System.currentTimeMillis(); @@ -43,7 +43,10 @@ class NoOpTimerService extends TimeServiceProvider { } @Override - public void shutdownService() throws Exception { + public void quiesceAndAwaitPending() {} + + @Override + public void shutdownService() { terminated = true; } } http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java new file mode 100644 index 0000000..ae895b6 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler; +import org.apache.flink.streaming.runtime.operators.Triggerable; + +import org.junit.Test; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class DefaultTimeServiceProviderTest { + + @Test + public void testImmediateShutdown() throws Exception { + + final Object lock = new Object(); + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(errorRef), lock); + + try { + assertFalse(timer.isTerminated()); + + final OneShotLatch latch = new OneShotLatch(); + + // the task should trigger immediately and should block until terminated with interruption + timer.registerTimer(System.currentTimeMillis(), new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + latch.trigger(); + Thread.sleep(100000000); + } + }); + + latch.await(); + timer.shutdownService(); + + // can only enter this scope after the triggerable is interrupted + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (lock) { + assertTrue(timer.isTerminated()); + } + + try { + timer.registerTimer(System.currentTimeMillis() + 1000, new Triggerable() { + @Override + public void trigger(long timestamp) {} + }); + + fail("should result in an exception"); + } + catch (IllegalStateException e) { + // expected + } + + // obviously, we have an asynchronous interrupted exception + assertNotNull(errorRef.get()); + assertTrue(errorRef.get().getCause() instanceof InterruptedException); + + assertEquals(0, timer.getNumTasksScheduled()); + } + finally { + timer.shutdownService(); + } + } + + @Test + public void testQuiescing() throws Exception { + + final Object lock = new Object(); + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(errorRef), lock); + + try { + final OneShotLatch latch = new OneShotLatch(); + + final ReentrantLock scopeLock = new ReentrantLock(); + + timer.registerTimer(System.currentTimeMillis() + 20, new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + scopeLock.lock(); + try { + latch.trigger(); + // delay a bit before leaving the method + Thread.sleep(5); + } finally { + scopeLock.unlock(); + } + } + }); + + // after the task triggered, shut the timer down cleanly, waiting for the task to finish + latch.await(); + timer.quiesceAndAwaitPending(); + + // should be able to immediately acquire the lock, since the task must have exited by now + assertTrue(scopeLock.tryLock()); + + // should be able to schedule more tasks (that never get executed) + ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + throw new Exception("test"); + } + }); + assertNotNull(future); + + // nothing should be scheduled right now + assertEquals(0, timer.getNumTasksScheduled()); + + // check that no asynchronous error was reported - that ensures that the newly scheduled + // triggerable did, in fact, not trigger + if (errorRef.get() != null) { + throw new Exception(errorRef.get()); + } + } + finally { + timer.shutdownService(); + } + } + + @Test + public void testFutureCancellation() throws Exception { + + final Object lock = new Object(); + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(errorRef), lock); + + try { + assertEquals(0, timer.getNumTasksScheduled()); + + // schedule something + ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() { + @Override + public void trigger(long timestamp) {} + }); + assertEquals(1, timer.getNumTasksScheduled()); + + future.cancel(false); + + assertEquals(0, timer.getNumTasksScheduled()); + } + finally { + timer.shutdownService(); + } + } +}
