[FLINK-4750] [runtime] Cleanly await end of all currently executing processing 
time timers when finite streams finish.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8aea8c8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8aea8c8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8aea8c8f

Branch: refs/heads/master
Commit: 8aea8c8f427f5511c6064abbc4b85a3ef106743a
Parents: 1cd8d4f
Author: Stephan Ewen <[email protected]>
Authored: Wed Oct 5 14:33:01 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Oct 5 20:04:34 2016 +0200

----------------------------------------------------------------------
 .../tasks/DefaultTimeServiceProvider.java       | 151 +++++++++++++++-
 .../streaming/runtime/tasks/StreamTask.java     |   3 +
 .../runtime/tasks/TestTimeServiceProvider.java  |  55 +++---
 .../runtime/tasks/TimeServiceProvider.java      |  60 +++++--
 .../operators/windowing/NoOpTimerService.java   |   7 +-
 .../tasks/DefaultTimeServiceProviderTest.java   | 179 +++++++++++++++++++
 6 files changed, 414 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index 5664eac..d2c743f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -17,12 +17,20 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 
+import javax.annotation.Nonnull;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -32,6 +40,12 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class DefaultTimeServiceProvider extends TimeServiceProvider {
 
+       private static final int STATUS_ALIVE = 0;
+       private static final int STATUS_QUIESCED = 1;
+       private static final int STATUS_SHUTDOWN = 2;
+
+       // 
------------------------------------------------------------------------
+
        /** The containing task that owns this time service provider. */
        private final AsyncExceptionHandler task;
 
@@ -41,6 +55,8 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
        /** The executor service that schedules and calls the triggers of this 
task*/
        private final ScheduledThreadPoolExecutor timerService;
 
+       private final AtomicInteger status;
+
 
        public DefaultTimeServiceProvider(AsyncExceptionHandler failureHandler, 
Object checkpointLock) {
                this(failureHandler, checkpointLock, null);
@@ -50,19 +66,24 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
                        AsyncExceptionHandler task,
                        Object checkpointLock,
                        ThreadFactory threadFactory) {
-               
+
                this.task = checkNotNull(task);
                this.checkpointLock = checkNotNull(checkpointLock);
 
+               this.status = new AtomicInteger(STATUS_ALIVE);
+
                if (threadFactory == null) {
                        this.timerService = new ScheduledThreadPoolExecutor(1);
                } else {
                        this.timerService = new ScheduledThreadPoolExecutor(1, 
threadFactory);
                }
 
-               // allow trigger tasks to be removed if all timers for
-               // that timestamp are removed by user
+               // tasks should be removed if the future is canceled
                this.timerService.setRemoveOnCancelPolicy(true);
+
+               // make sure shutdown removes all pending tasks
+               
this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+               
this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        }
 
        @Override
@@ -73,17 +94,50 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
        @Override
        public ScheduledFuture<?> registerTimer(long timestamp, Triggerable 
target) {
                long delay = Math.max(timestamp - getCurrentProcessingTime(), 
0);
-               return timerService.schedule(new TriggerTask(task, 
checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
+
+               // we directly try to register the timer and only react to the 
status on exception
+               // that way we save unnecessary volatile accesses for each timer
+               try {
+                       return timerService.schedule(
+                                       new TriggerTask(task, checkpointLock, 
target, timestamp), delay, TimeUnit.MILLISECONDS);
+               }
+               catch (RejectedExecutionException e) {
+                       final int status = this.status.get();
+                       if (status == STATUS_QUIESCED) {
+                               return new NeverCompleteFuture(delay);
+                       }
+                       else if (status == STATUS_SHUTDOWN) {
+                               throw new IllegalStateException("Timer service 
is shut down");
+                       }
+                       else {
+                               // something else happened, so propagate the 
exception
+                               throw e;
+                       }
+               }
        }
 
        @Override
        public boolean isTerminated() {
-               return timerService.isTerminated();
+               return status.get() == STATUS_SHUTDOWN;
        }
 
        @Override
-       public void shutdownService() throws Exception {
-               timerService.shutdownNow();
+       public void quiesceAndAwaitPending() throws InterruptedException {
+               if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) {
+                       timerService.shutdown();
+
+                       // await forever (almost)
+                       timerService.awaitTermination(365, TimeUnit.DAYS);
+               }
+       }
+
+       @Override
+       public void shutdownService() {
+               if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) || 
+                               status.compareAndSet(STATUS_QUIESCED, 
STATUS_SHUTDOWN))
+               {
+                       timerService.shutdownNow();
+               }
        }
 
        // safety net to destroy the thread pool
@@ -93,6 +147,18 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
                timerService.shutdownNow();
        }
 
+       @VisibleForTesting
+       int getNumTasksScheduled() {
+               BlockingQueue<?> queue = timerService.getQueue();
+               if (queue == null) {
+                       return 0;
+               } else {
+                       return queue.size();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
        /**
         * Internal task that is invoked by the timer service and triggers the 
target.
         */
@@ -122,4 +188,75 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
                        }
                }
        }
+
+       // 
------------------------------------------------------------------------
+
+       private static final class NeverCompleteFuture implements 
ScheduledFuture<Object> {
+
+               private final Object lock = new Object();
+
+               private final long delayMillis;
+
+               private volatile boolean canceled;
+
+
+               private NeverCompleteFuture(long delayMillis) {
+                       this.delayMillis = delayMillis;
+               }
+
+               @Override
+               public long getDelay(@Nonnull TimeUnit unit) {
+                       return unit.convert(delayMillis, TimeUnit.MILLISECONDS);
+               }
+
+               @Override
+               public int compareTo(@Nonnull Delayed o) {
+                       long otherMillis = o.getDelay(TimeUnit.MILLISECONDS);
+                       return Long.compare(this.delayMillis, otherMillis);
+               }
+
+               @Override
+               public boolean cancel(boolean mayInterruptIfRunning) {
+                       synchronized (lock) {
+                               canceled = true;
+                               lock.notifyAll();
+                       }
+                       return true;
+               }
+
+               @Override
+               public boolean isCancelled() {
+                       return canceled;
+               }
+
+               @Override
+               public boolean isDone() {
+                       return false;
+               }
+
+               @Override
+               public Object get() throws InterruptedException {
+                       synchronized (lock) {
+                               while (!canceled) {
+                                       lock.wait();
+                               }
+                       }
+                       throw new CancellationException();
+               }
+
+               @Override
+               public Object get(long timeout, @Nonnull TimeUnit unit) throws 
InterruptedException, TimeoutException {
+                       synchronized (lock) {
+                               while (!canceled) {
+                                       unit.timedWait(lock, timeout);
+                               }
+
+                               if (canceled) {
+                                       throw new CancellationException();
+                               } else {
+                                       throw new TimeoutException();
+                               }
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 040ec66..ff14249 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -269,6 +269,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        isRunning = true;
                        run();
 
+                       // make sure all timers finish and no new timers can 
come
+                       timerService.quiesceAndAwaitPending();
+
                        LOG.debug("Finished task {}", getName());
 
                        // make sure no further checkpoint and notification 
actions happen.

http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
index f4bead9..9eb6cd1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
@@ -39,6 +39,7 @@ public class TestTimeServiceProvider extends 
TimeServiceProvider {
        private volatile long currentTime = 0;
 
        private volatile boolean isTerminated;
+       private volatile boolean isQuiesced;
 
        // sorts the timers by timestamp so that they are processed in the 
correct order.
        private final Map<Long, List<Triggerable>> registeredTasks = new 
TreeMap<>();
@@ -47,25 +48,27 @@ public class TestTimeServiceProvider extends 
TimeServiceProvider {
        public void setCurrentTime(long timestamp) throws Exception {
                this.currentTime = timestamp;
 
-               // decide which timers to fire and put them in a list
-               // we do not fire them here to be able to accommodate timers
-               // that register other timers.
-
-               Iterator<Map.Entry<Long, List<Triggerable>>> it = 
registeredTasks.entrySet().iterator();
-               List<Map.Entry<Long, List<Triggerable>>> toRun = new 
ArrayList<>();
-               while (it.hasNext()) {
-                       Map.Entry<Long, List<Triggerable>> t = it.next();
-                       if (t.getKey() <= this.currentTime) {
-                               toRun.add(t);
-                               it.remove();
+               if (!isQuiesced) {
+                       // decide which timers to fire and put them in a list
+                       // we do not fire them here to be able to accommodate 
timers
+                       // that register other timers.
+       
+                       Iterator<Map.Entry<Long, List<Triggerable>>> it = 
registeredTasks.entrySet().iterator();
+                       List<Map.Entry<Long, List<Triggerable>>> toRun = new 
ArrayList<>();
+                       while (it.hasNext()) {
+                               Map.Entry<Long, List<Triggerable>> t = 
it.next();
+                               if (t.getKey() <= this.currentTime) {
+                                       toRun.add(t);
+                                       it.remove();
+                               }
                        }
-               }
-
-               // now do the actual firing.
-               for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
-                       long now = tasks.getKey();
-                       for (Triggerable task: tasks.getValue()) {
-                               task.trigger(now);
+       
+                       // now do the actual firing.
+                       for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
+                               long now = tasks.getKey();
+                               for (Triggerable task: tasks.getValue()) {
+                                       task.trigger(now);
+                               }
                        }
                }
        }
@@ -80,6 +83,9 @@ public class TestTimeServiceProvider extends 
TimeServiceProvider {
                if (isTerminated) {
                        throw new IllegalStateException("terminated");
                }
+               if (isQuiesced) {
+                       return new DummyFuture();
+               }
 
                if (timestamp <= currentTime) {
                        try {
@@ -88,7 +94,6 @@ public class TestTimeServiceProvider extends 
TimeServiceProvider {
                                throw new RuntimeException(e);
                        }
                }
-
                List<Triggerable> tasks = registeredTasks.get(timestamp);
                if (tasks == null) {
                        tasks = new ArrayList<>();
@@ -105,8 +110,16 @@ public class TestTimeServiceProvider extends 
TimeServiceProvider {
        }
 
        @Override
-       public void shutdownService() throws Exception {
-               isTerminated = true;
+       public void quiesceAndAwaitPending() {
+               if (!isTerminated) {
+                       isQuiesced = true;
+                       registeredTasks.clear();
+               }
+       }
+
+       @Override
+       public void shutdownService() {
+               this.isTerminated = true;
        }
 
        public int getNumRegisteredTimers() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
index 42a4fa4..afa6f35 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
@@ -14,32 +14,70 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.streaming.runtime.operators.Triggerable;
+
 import java.util.concurrent.ScheduledFuture;
 
 /**
  * Defines the current processing time and handles all related actions,
  * such as register timers for tasks to be executed in the future.
+ * 
+ * <p>The access to the time via {@link #getCurrentProcessingTime()} is always 
available, regardless of
+ * whether the timer service has been shut down.
+ * 
+ * <p>The registration of timers follows a life cycle of three phases:
+ * <ol>
+ *     <li>In the initial state, it accepts timer registrations and triggers 
when the time is reached.</li>
+ *     <li>After calling {@link #quiesceAndAwaitPending()}, further calls to
+ *         {@link #registerTimer(long, Triggerable)} will not register any 
further timers, and will
+ *         return a "dummy" future as a result. This is used for clean 
shutdown, where currently firing
+ *         timers are waited for and no future timers can be scheduled, 
without causing hard exceptions.</li>
+ *     <li>After a call to {@link #shutdownService()}, all calls to {@link 
#registerTimer(long, Triggerable)}
+ *         will result in a hard exception.</li>
+ * </ol>
  */
 public abstract class TimeServiceProvider {
 
-       /** Returns the current processing time. */
+       /**
+        * Returns the current processing time.
+        */
        public abstract long getCurrentProcessingTime();
 
-       /** Registers a task to be executed when (processing) time is {@code 
timestamp}.
-        * @param timestamp
-        *                                              when the task is to be 
executed (in processing time)
-        * @param target
-        *                                              the task to be executed
-        * @return the result to be returned.
+       /**
+        * Registers a task to be executed when (processing) time is {@code 
timestamp}.
+        * 
+        * @param timestamp   Time when the task is to be executed (in 
processing time)
+        * @param target      The task to be executed
+        * 
+        * @return The future that represents the scheduled task. This always 
returns some future,
+        *         even if the timer was shut down
         */
-       public abstract ScheduledFuture<?> registerTimer(final long timestamp, 
final Triggerable target);
+       public abstract ScheduledFuture<?> registerTimer(long timestamp, 
Triggerable target);
 
-       /** Returns <tt>true</tt> if the service has been shut down, 
<tt>false</tt> otherwise. */
+       /**
+        * Returns <tt>true</tt> if the service has been shut down, 
<tt>false</tt> otherwise.
+        */
        public abstract boolean isTerminated();
 
-       /** Shuts down and clean up the timer service provider. */
-       public abstract void shutdownService() throws Exception;
+       /**
+        * This method puts the service into a state where it does not register 
new timers, but
+        * returns for each call to {@link #registerTimer(long, Triggerable)} 
only a "mock" future.
+        * Furthermore, the method clears all not yet started timers, and 
awaits the completion
+        * of currently executing timers.
+        * 
+        * <p>This method can be used to cleanly shut down the timer service. 
The using components
+        * will not notice that the service is shut down (as for example via 
exceptions when registering
+        * a new timer), but the service will simply not fire any timer any 
more.
+        */
+       public abstract void quiesceAndAwaitPending() throws 
InterruptedException;
+
+       /**
+        * Shuts down and clean up the timer service provider hard and 
immediately. This does not wait
+        * for any timer to complete. Any further call to {@link 
#registerTimer(long, Triggerable)}
+        * will result in a hard exception.
+        */
+       public abstract void shutdownService();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
index 16e658e..d0c5050 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
@@ -26,7 +26,7 @@ import java.util.concurrent.ScheduledFuture;
 class NoOpTimerService extends TimeServiceProvider {
 
        private volatile boolean terminated;
-       
+
        @Override
        public long getCurrentProcessingTime() {
                return System.currentTimeMillis();
@@ -43,7 +43,10 @@ class NoOpTimerService extends TimeServiceProvider {
        }
 
        @Override
-       public void shutdownService() throws Exception {
+       public void quiesceAndAwaitPending() {}
+
+       @Override
+       public void shutdownService() {
                terminated = true;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8aea8c8f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
new file mode 100644
index 0000000..ae895b6
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import 
org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import org.junit.Test;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DefaultTimeServiceProviderTest {
+
+       @Test
+       public void testImmediateShutdown() throws Exception {
+
+               final Object lock = new Object();
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+
+               final DefaultTimeServiceProvider timer = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
+
+               try {
+                       assertFalse(timer.isTerminated());
+
+                       final OneShotLatch latch = new OneShotLatch();
+
+                       // the task should trigger immediately and should block 
until terminated with interruption
+                       timer.registerTimer(System.currentTimeMillis(), new 
Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) throws 
Exception {
+                                       latch.trigger();
+                                       Thread.sleep(100000000);
+                               }
+                       });
+
+                       latch.await();
+                       timer.shutdownService();
+
+                       // can only enter this scope after the triggerable is 
interrupted
+                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                       synchronized (lock) {
+                               assertTrue(timer.isTerminated());
+                       }
+
+                       try {
+                               timer.registerTimer(System.currentTimeMillis() 
+ 1000, new Triggerable() {
+                                       @Override
+                                       public void trigger(long timestamp) {}
+                               });
+
+                               fail("should result in an exception");
+                       }
+                       catch (IllegalStateException e) {
+                               // expected
+                       }
+
+                       // obviously, we have an asynchronous interrupted 
exception
+                       assertNotNull(errorRef.get());
+                       assertTrue(errorRef.get().getCause() instanceof 
InterruptedException);
+
+                       assertEquals(0, timer.getNumTasksScheduled());
+               }
+               finally {
+                       timer.shutdownService();
+               }
+       }
+
+       @Test
+       public void testQuiescing() throws Exception {
+
+               final Object lock = new Object();
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+
+               final DefaultTimeServiceProvider timer = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
+
+               try {
+                       final OneShotLatch latch = new OneShotLatch();
+
+                       final ReentrantLock scopeLock = new ReentrantLock();
+
+                       timer.registerTimer(System.currentTimeMillis() + 20, 
new Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) throws 
Exception {
+                                       scopeLock.lock();
+                                       try {
+                                               latch.trigger();
+                                               // delay a bit before leaving 
the method
+                                               Thread.sleep(5);
+                                       } finally {
+                                               scopeLock.unlock();
+                                       }
+                               }
+                       });
+
+                       // after the task triggered, shut the timer down 
cleanly, waiting for the task to finish
+                       latch.await();
+                       timer.quiesceAndAwaitPending();
+
+                       // should be able to immediately acquire the lock, 
since the task must have exited by now 
+                       assertTrue(scopeLock.tryLock());
+
+                       // should be able to schedule more tasks (that never 
get executed)
+                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) throws 
Exception {
+                                       throw new Exception("test");
+                               }
+                       });
+                       assertNotNull(future);
+
+                       // nothing should be scheduled right now
+                       assertEquals(0, timer.getNumTasksScheduled());
+
+                       // check that no asynchronous error was reported - that 
ensures that the newly scheduled 
+                       // triggerable did, in fact, not trigger
+                       if (errorRef.get() != null) {
+                               throw new Exception(errorRef.get());
+                       }
+               }
+               finally {
+                       timer.shutdownService();
+               }
+       }
+
+       @Test
+       public void testFutureCancellation() throws Exception {
+
+               final Object lock = new Object();
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+
+               final DefaultTimeServiceProvider timer = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
+
+               try {
+                       assertEquals(0, timer.getNumTasksScheduled());
+
+                       // schedule something
+                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) {}
+                       });
+                       assertEquals(1, timer.getNumTasksScheduled());
+
+                       future.cancel(false);
+
+                       assertEquals(0, timer.getNumTasksScheduled());
+               }
+               finally {
+                       timer.shutdownService();
+               }
+       }
+}

Reply via email to