[FLINK-4700] [tests] Expand and harden TimeServiceProvider test

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4fc54e3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4fc54e3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4fc54e3e

Branch: refs/heads/master
Commit: 4fc54e3eb341a049529476ef966380d183d099d4
Parents: 8aea8c8
Author: Stephan Ewen <[email protected]>
Authored: Wed Oct 5 16:44:56 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Oct 5 20:31:56 2016 +0200

----------------------------------------------------------------------
 .../AbstractFetcherTimestampsTest.java          |   2 +-
 .../runtime/operators/TestTimeProviderTest.java | 113 ++++++++
 .../runtime/operators/TimeProviderTest.java     | 269 -------------------
 ...AlignedProcessingTimeWindowOperatorTest.java |   2 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |   2 +-
 .../tasks/DefaultTimeServiceProviderTest.java   | 136 +++++++++-
 6 files changed, 251 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index c3ba7b7..9b5d2e6 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import 
org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
+import 
org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
new file mode 100644
index 0000000..a8f2dc4
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+public class TestTimeProviderTest {
+
+       @Test
+       public void testCustomTimeServiceProvider() throws Throwable {
+               TestTimeServiceProvider tp = new TestTimeServiceProvider();
+
+               final OneInputStreamTask<String, String> mapTask = new 
OneInputStreamTask<>();
+               mapTask.setTimeService(tp);
+
+               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(
+                       mapTask, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+
+               StreamMap<String, String> mapOperator = new StreamMap<>(new 
StreamTaskTimerTest.DummyMapFunction<String>());
+               streamConfig.setStreamOperator(mapOperator);
+
+               testHarness.invoke();
+
+               
assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 0);
+
+               tp.setCurrentTime(11);
+               
assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 11);
+
+               tp.setCurrentTime(15);
+               tp.setCurrentTime(16);
+               
assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 16);
+
+               // register 2 tasks
+               mapTask.getTimerService().registerTimer(30, new Triggerable() {
+                       @Override
+                       public void trigger(long timestamp) {
+
+                       }
+               });
+
+               mapTask.getTimerService().registerTimer(40, new Triggerable() {
+                       @Override
+                       public void trigger(long timestamp) {
+
+                       }
+               });
+
+               assertEquals(2, tp.getNumRegisteredTimers());
+
+               tp.setCurrentTime(35);
+               assertEquals(1, tp.getNumRegisteredTimers());
+
+               tp.setCurrentTime(40);
+               assertEquals(0, tp.getNumRegisteredTimers());
+
+               tp.shutdownService();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public static class ReferenceSettingExceptionHandler implements 
AsyncExceptionHandler {
+
+               private final AtomicReference<Throwable> errorReference;
+
+               public 
ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) {
+                       this.errorReference = errorReference;
+               }
+
+               @Override
+               public void handleAsyncException(String message, Throwable 
exception) {
+                       errorReference.compareAndSet(null, exception);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
deleted file mode 100644
index 8d3e621..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-public class TimeProviderTest {
-
-       @Test
-       public void testDefaultTimeProvider() throws InterruptedException {
-               final OneShotLatch latch = new OneShotLatch();
-
-               final Object lock = new Object();
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
-               
-               TimeServiceProvider timeServiceProvider = new 
DefaultTimeServiceProvider(
-                               new ReferenceSettingExceptionHandler(error), 
lock);
-
-               final List<Long> timestamps = new ArrayList<>();
-
-               long interval = 50L;
-               final long noOfTimers = 20;
-
-               // we add 2 timers per iteration minus the first that would 
have a negative timestamp
-               final long expectedNoOfTimers = 2 * noOfTimers;
-
-               for (int i = 0; i < noOfTimers; i++) {
-
-                       // we add a delay (100ms) so that both timers are 
inserted before the first is processed.
-                       // If not, and given that we add timers out of order, 
we may have a timer firing
-                       // before the next one (with smaller timestamp) is 
added.
-
-                       double nextTimer = 
timeServiceProvider.getCurrentProcessingTime() + 100 + i * interval;
-
-                       timeServiceProvider.registerTimer((long) nextTimer, new 
Triggerable() {
-                               @Override
-                               public void trigger(long timestamp) throws 
Exception {
-                                       timestamps.add(timestamp);
-                                       if (timestamps.size() == 
expectedNoOfTimers) {
-                                               latch.trigger();
-                                       }
-                               }
-                       });
-
-                       // add also out-of-order tasks to verify that eventually
-                       // they will be executed in the correct order.
-
-                       timeServiceProvider.registerTimer((long) (nextTimer - 
10L), new Triggerable() {
-                               @Override
-                               public void trigger(long timestamp) throws 
Exception {
-                                       timestamps.add(timestamp);
-                                       if (timestamps.size() == 
expectedNoOfTimers) {
-                                               latch.trigger();
-                                       }
-                               }
-                       });
-               }
-
-               if (!latch.isTriggered()) {
-                       latch.await();
-               }
-
-               Assert.assertEquals(timestamps.size(), expectedNoOfTimers);
-
-               // verify that the tasks are executed
-               // in ascending timestamp order
-
-               int counter = 0;
-               long lastTs = Long.MIN_VALUE;
-               for (long timestamp: timestamps) {
-                       Assert.assertTrue(timestamp >= lastTs);
-                       if (lastTs != Long.MIN_VALUE && counter % 2 == 1) {
-                               Assert.assertEquals((timestamp - lastTs), 10);
-                       }
-                       lastTs = timestamp;
-                       counter++;
-               }
-
-               assertNull(error.get());
-       }
-
-       @Test
-       public void testDefaultTimeProviderExceptionHandling() throws 
InterruptedException {
-               final OneShotLatch latch = new OneShotLatch();
-
-               final AtomicBoolean exceptionWasThrown = new 
AtomicBoolean(false);
-
-               final Object lock = new Object();
-
-               TimeServiceProvider timeServiceProvider = new 
DefaultTimeServiceProvider(
-                       new AsyncExceptionHandler() {
-                               @Override
-                               public void handleAsyncException(String 
message, Throwable exception) {
-                                       exceptionWasThrown.compareAndSet(false, 
true);
-                                       latch.trigger();
-                               }
-                       }, lock);
-
-               long now = System.currentTimeMillis();
-               timeServiceProvider.registerTimer(now, new Triggerable() {
-                       @Override
-                       public void trigger(long timestamp) throws Exception {
-                               throw new Exception("Exception in Timer");
-                       }
-               });
-
-               if (!latch.isTriggered()) {
-                       latch.await();
-               }
-               Assert.assertTrue(exceptionWasThrown.get());
-       }
-
-       @Test
-       public void testTimerSorting() throws Exception {
-
-               final List<Long> result = new ArrayList<>();
-
-               TestTimeServiceProvider provider = new 
TestTimeServiceProvider();
-
-               provider.registerTimer(45, new Triggerable() {
-                       @Override
-                       public void trigger(long timestamp) {
-                               result.add(timestamp);
-                       }
-               });
-
-               provider.registerTimer(50, new Triggerable() {
-                       @Override
-                       public void trigger(long timestamp) {
-                               result.add(timestamp);
-                       }
-               });
-
-               provider.registerTimer(30, new Triggerable() {
-                       @Override
-                       public void trigger(long timestamp) {
-                               result.add(timestamp);
-                       }
-               });
-
-               provider.registerTimer(50, new Triggerable() {
-                       @Override
-                       public void trigger(long timestamp) {
-                               result.add(timestamp);
-                       }
-               });
-
-               Assert.assertEquals(provider.getNumRegisteredTimers(), 4);
-
-               provider.setCurrentTime(100);
-               long seen = 0;
-               for (Long l: result) {
-                       Assert.assertTrue(l >= seen);
-                       seen = l;
-               }
-       }
-
-       @Test
-       public void testCustomTimeServiceProvider() throws Throwable {
-               TestTimeServiceProvider tp = new TestTimeServiceProvider();
-
-               final OneInputStreamTask<String, String> mapTask = new 
OneInputStreamTask<>();
-               mapTask.setTimeService(tp);
-
-               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(
-                       mapTask, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               StreamConfig streamConfig = testHarness.getStreamConfig();
-
-               StreamMap<String, String> mapOperator = new StreamMap<>(new 
StreamTaskTimerTest.DummyMapFunction<String>());
-               streamConfig.setStreamOperator(mapOperator);
-
-               testHarness.invoke();
-
-               
assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 0);
-
-               tp.setCurrentTime(11);
-               
assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 11);
-
-               tp.setCurrentTime(15);
-               tp.setCurrentTime(16);
-               
assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 16);
-
-               // register 2 tasks
-               mapTask.getTimerService().registerTimer(30, new Triggerable() {
-                       @Override
-                       public void trigger(long timestamp) {
-
-                       }
-               });
-
-               mapTask.getTimerService().registerTimer(40, new Triggerable() {
-                       @Override
-                       public void trigger(long timestamp) {
-
-                       }
-               });
-
-               assertEquals(2, tp.getNumRegisteredTimers());
-
-               tp.setCurrentTime(35);
-               assertEquals(1, tp.getNumRegisteredTimers());
-
-               tp.setCurrentTime(40);
-               assertEquals(0, tp.getNumRegisteredTimers());
-
-               tp.shutdownService();
-       }
-
-       // 
------------------------------------------------------------------------
-
-       public static class ReferenceSettingExceptionHandler implements 
AsyncExceptionHandler {
-
-               private final AtomicReference<Throwable> errorReference;
-
-               public 
ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) {
-                       this.errorReference = errorReference;
-               }
-
-               @Override
-               public void handleAsyncException(String message, Throwable 
exception) {
-                       errorReference.compareAndSet(null, exception);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 4c6d391..2f687f6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -39,7 +39,7 @@ import 
org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import 
org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
+import 
org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 88e28bc..cd82a9c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -40,7 +40,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
-import 
org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
+import 
org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
index ae895b6..29e13ed 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
@@ -19,12 +19,14 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.core.testutils.OneShotLatch;
-import 
org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
+import 
org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 
 import org.junit.Test;
 
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -37,6 +39,40 @@ import static org.junit.Assert.fail;
 public class DefaultTimeServiceProviderTest {
 
        @Test
+       public void testTriggerHoldsLock() throws Exception {
+
+               final Object lock = new Object();
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+
+               final DefaultTimeServiceProvider timer = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
+
+               try {
+                       assertEquals(0, timer.getNumTasksScheduled());
+
+                       // schedule something
+                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) {
+                                       assertTrue(Thread.holdsLock(lock));
+                               }
+                       });
+
+                       // wait until the execution is over
+                       future.get();
+                       assertEquals(0, timer.getNumTasksScheduled());
+
+                       // check that no asynchronous error was reported
+                       if (errorRef.get() != null) {
+                               throw new Exception(errorRef.get());
+                       }
+               }
+               finally {
+                       timer.shutdownService();
+               }
+       }
+
+       @Test
        public void testImmediateShutdown() throws Exception {
 
                final Object lock = new Object();
@@ -171,6 +207,104 @@ public class DefaultTimeServiceProviderTest {
                        future.cancel(false);
 
                        assertEquals(0, timer.getNumTasksScheduled());
+
+                       // check that no asynchronous error was reported
+                       if (errorRef.get() != null) {
+                               throw new Exception(errorRef.get());
+                       }
+               }
+               finally {
+                       timer.shutdownService();
+               }
+       }
+
+       @Test
+       public void testExceptionReporting() throws InterruptedException {
+               final AtomicBoolean exceptionWasThrown = new 
AtomicBoolean(false);
+               final OneShotLatch latch = new OneShotLatch();
+               final Object lock = new Object();
+
+               TimeServiceProvider timeServiceProvider = new 
DefaultTimeServiceProvider(
+                               new AsyncExceptionHandler() {
+                                       @Override
+                                       public void handleAsyncException(String 
message, Throwable exception) {
+                                               exceptionWasThrown.set(true);
+                                               latch.trigger();
+                                       }
+                               }, lock);
+               
+               timeServiceProvider.registerTimer(System.currentTimeMillis(), 
new Triggerable() {
+                       @Override
+                       public void trigger(long timestamp) throws Exception {
+                               throw new Exception("Exception in Timer");
+                       }
+               });
+
+               latch.await();
+               assertTrue(exceptionWasThrown.get());
+       }
+
+       @Test
+       public void testTimerSorting() throws Exception {
+               final Object lock = new Object();
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+
+               final DefaultTimeServiceProvider timer = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
+
+               try {
+                       final OneShotLatch sync = new OneShotLatch();
+
+                       // we block the timer execution to make sure we have 
all the time
+                       // to register some additional timers out of order
+                       timer.registerTimer(System.currentTimeMillis(), new 
Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) throws 
Exception {
+                                       sync.await();
+                               }
+                       });
+                       
+                       // schedule two timers out of order something
+                       final long now = System.currentTimeMillis();
+                       final long time1 = now + 6;
+                       final long time2 = now + 5;
+                       final long time3 = now + 8;
+                       final long time4 = now - 2;
+
+                       final ArrayBlockingQueue<Long> timestamps = new 
ArrayBlockingQueue<>(4);
+                       Triggerable trigger = new Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) {
+                                       timestamps.add(timestamp);
+                               }
+                       };
+
+                       // schedule
+                       ScheduledFuture<?> future1 = timer.registerTimer(time1, 
trigger);
+                       ScheduledFuture<?> future2 = timer.registerTimer(time2, 
trigger);
+                       ScheduledFuture<?> future3 = timer.registerTimer(time3, 
trigger);
+                       ScheduledFuture<?> future4 = timer.registerTimer(time4, 
trigger);
+
+                       // now that everything is scheduled, unblock the timer 
service
+                       sync.trigger();
+
+                       // wait until both are complete
+                       future1.get();
+                       future2.get();
+                       future3.get();
+                       future4.get();
+
+                       // verify that the order is 4 - 2 - 1 - 3
+                       assertEquals(4, timestamps.size());
+                       assertEquals(time4, timestamps.take().longValue());
+                       assertEquals(time2, timestamps.take().longValue());
+                       assertEquals(time1, timestamps.take().longValue());
+                       assertEquals(time3, timestamps.take().longValue());
+
+                       // check that no asynchronous error was reported
+                       if (errorRef.get() != null) {
+                               throw new Exception(errorRef.get());
+                       }
                }
                finally {
                        timer.shutdownService();

Reply via email to