[FLINK-4700] [tests] Expand and harden TimeServiceProvider test
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4fc54e3e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4fc54e3e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4fc54e3e Branch: refs/heads/master Commit: 4fc54e3eb341a049529476ef966380d183d099d4 Parents: 8aea8c8 Author: Stephan Ewen <[email protected]> Authored: Wed Oct 5 16:44:56 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Oct 5 20:31:56 2016 +0200 ---------------------------------------------------------------------- .../AbstractFetcherTimestampsTest.java | 2 +- .../runtime/operators/TestTimeProviderTest.java | 113 ++++++++ .../runtime/operators/TimeProviderTest.java | 269 ------------------- ...AlignedProcessingTimeWindowOperatorTest.java | 2 +- ...AlignedProcessingTimeWindowOperatorTest.java | 2 +- .../tasks/DefaultTimeServiceProviderTest.java | 136 +++++++++- 6 files changed, 251 insertions(+), 273 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java index c3ba7b7..9b5d2e6 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java @@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; -import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler; +import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java new file mode 100644 index 0000000..a8f2dc4 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ResultPartitionWriter.class}) +@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) +public class TestTimeProviderTest { + + @Test + public void testCustomTimeServiceProvider() throws Throwable { + TestTimeServiceProvider tp = new TestTimeServiceProvider(); + + final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>(); + mapTask.setTimeService(tp); + + final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>( + mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + StreamConfig streamConfig = testHarness.getStreamConfig(); + + StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>()); + streamConfig.setStreamOperator(mapOperator); + + testHarness.invoke(); + + assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 0); + + tp.setCurrentTime(11); + assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 11); + + tp.setCurrentTime(15); + tp.setCurrentTime(16); + assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 16); + + // register 2 tasks + mapTask.getTimerService().registerTimer(30, new Triggerable() { + @Override + public void trigger(long timestamp) { + + } + }); + + mapTask.getTimerService().registerTimer(40, new Triggerable() { + @Override + public void trigger(long timestamp) { + + } + }); + + assertEquals(2, tp.getNumRegisteredTimers()); + + tp.setCurrentTime(35); + assertEquals(1, tp.getNumRegisteredTimers()); + + tp.setCurrentTime(40); + assertEquals(0, tp.getNumRegisteredTimers()); + + tp.shutdownService(); + } + + // ------------------------------------------------------------------------ + + public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler { + + private final AtomicReference<Throwable> errorReference; + + public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) { + this.errorReference = errorReference; + } + + @Override + public void handleAsyncException(String message, Throwable exception) { + errorReference.compareAndSet(null, exception); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java deleted file mode 100644 index 8d3e621..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.StreamMap; -import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler; -import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; -import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; -import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; -import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; - -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({ResultPartitionWriter.class}) -@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) -public class TimeProviderTest { - - @Test - public void testDefaultTimeProvider() throws InterruptedException { - final OneShotLatch latch = new OneShotLatch(); - - final Object lock = new Object(); - final AtomicReference<Throwable> error = new AtomicReference<>(); - - TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider( - new ReferenceSettingExceptionHandler(error), lock); - - final List<Long> timestamps = new ArrayList<>(); - - long interval = 50L; - final long noOfTimers = 20; - - // we add 2 timers per iteration minus the first that would have a negative timestamp - final long expectedNoOfTimers = 2 * noOfTimers; - - for (int i = 0; i < noOfTimers; i++) { - - // we add a delay (100ms) so that both timers are inserted before the first is processed. - // If not, and given that we add timers out of order, we may have a timer firing - // before the next one (with smaller timestamp) is added. - - double nextTimer = timeServiceProvider.getCurrentProcessingTime() + 100 + i * interval; - - timeServiceProvider.registerTimer((long) nextTimer, new Triggerable() { - @Override - public void trigger(long timestamp) throws Exception { - timestamps.add(timestamp); - if (timestamps.size() == expectedNoOfTimers) { - latch.trigger(); - } - } - }); - - // add also out-of-order tasks to verify that eventually - // they will be executed in the correct order. - - timeServiceProvider.registerTimer((long) (nextTimer - 10L), new Triggerable() { - @Override - public void trigger(long timestamp) throws Exception { - timestamps.add(timestamp); - if (timestamps.size() == expectedNoOfTimers) { - latch.trigger(); - } - } - }); - } - - if (!latch.isTriggered()) { - latch.await(); - } - - Assert.assertEquals(timestamps.size(), expectedNoOfTimers); - - // verify that the tasks are executed - // in ascending timestamp order - - int counter = 0; - long lastTs = Long.MIN_VALUE; - for (long timestamp: timestamps) { - Assert.assertTrue(timestamp >= lastTs); - if (lastTs != Long.MIN_VALUE && counter % 2 == 1) { - Assert.assertEquals((timestamp - lastTs), 10); - } - lastTs = timestamp; - counter++; - } - - assertNull(error.get()); - } - - @Test - public void testDefaultTimeProviderExceptionHandling() throws InterruptedException { - final OneShotLatch latch = new OneShotLatch(); - - final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false); - - final Object lock = new Object(); - - TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider( - new AsyncExceptionHandler() { - @Override - public void handleAsyncException(String message, Throwable exception) { - exceptionWasThrown.compareAndSet(false, true); - latch.trigger(); - } - }, lock); - - long now = System.currentTimeMillis(); - timeServiceProvider.registerTimer(now, new Triggerable() { - @Override - public void trigger(long timestamp) throws Exception { - throw new Exception("Exception in Timer"); - } - }); - - if (!latch.isTriggered()) { - latch.await(); - } - Assert.assertTrue(exceptionWasThrown.get()); - } - - @Test - public void testTimerSorting() throws Exception { - - final List<Long> result = new ArrayList<>(); - - TestTimeServiceProvider provider = new TestTimeServiceProvider(); - - provider.registerTimer(45, new Triggerable() { - @Override - public void trigger(long timestamp) { - result.add(timestamp); - } - }); - - provider.registerTimer(50, new Triggerable() { - @Override - public void trigger(long timestamp) { - result.add(timestamp); - } - }); - - provider.registerTimer(30, new Triggerable() { - @Override - public void trigger(long timestamp) { - result.add(timestamp); - } - }); - - provider.registerTimer(50, new Triggerable() { - @Override - public void trigger(long timestamp) { - result.add(timestamp); - } - }); - - Assert.assertEquals(provider.getNumRegisteredTimers(), 4); - - provider.setCurrentTime(100); - long seen = 0; - for (Long l: result) { - Assert.assertTrue(l >= seen); - seen = l; - } - } - - @Test - public void testCustomTimeServiceProvider() throws Throwable { - TestTimeServiceProvider tp = new TestTimeServiceProvider(); - - final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>(); - mapTask.setTimeService(tp); - - final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>( - mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); - - StreamConfig streamConfig = testHarness.getStreamConfig(); - - StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>()); - streamConfig.setStreamOperator(mapOperator); - - testHarness.invoke(); - - assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 0); - - tp.setCurrentTime(11); - assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 11); - - tp.setCurrentTime(15); - tp.setCurrentTime(16); - assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 16); - - // register 2 tasks - mapTask.getTimerService().registerTimer(30, new Triggerable() { - @Override - public void trigger(long timestamp) { - - } - }); - - mapTask.getTimerService().registerTimer(40, new Triggerable() { - @Override - public void trigger(long timestamp) { - - } - }); - - assertEquals(2, tp.getNumRegisteredTimers()); - - tp.setCurrentTime(35); - assertEquals(1, tp.getNumRegisteredTimers()); - - tp.setCurrentTime(40); - assertEquals(0, tp.getNumRegisteredTimers()); - - tp.shutdownService(); - } - - // ------------------------------------------------------------------------ - - public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler { - - private final AtomicReference<Throwable> errorReference; - - public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) { - this.errorReference = errorReference; - } - - @Override - public void handleAsyncException(String message, Throwable exception) { - errorReference.compareAndSet(null, exception); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 4c6d391..2f687f6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler; +import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.StreamTask; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java index 88e28bc..cd82a9c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java @@ -40,7 +40,7 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler; +import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.StreamTask; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc54e3e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java index ae895b6..29e13ed 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java @@ -19,12 +19,14 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler; +import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.junit.Test; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; @@ -37,6 +39,40 @@ import static org.junit.Assert.fail; public class DefaultTimeServiceProviderTest { @Test + public void testTriggerHoldsLock() throws Exception { + + final Object lock = new Object(); + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(errorRef), lock); + + try { + assertEquals(0, timer.getNumTasksScheduled()); + + // schedule something + ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new Triggerable() { + @Override + public void trigger(long timestamp) { + assertTrue(Thread.holdsLock(lock)); + } + }); + + // wait until the execution is over + future.get(); + assertEquals(0, timer.getNumTasksScheduled()); + + // check that no asynchronous error was reported + if (errorRef.get() != null) { + throw new Exception(errorRef.get()); + } + } + finally { + timer.shutdownService(); + } + } + + @Test public void testImmediateShutdown() throws Exception { final Object lock = new Object(); @@ -171,6 +207,104 @@ public class DefaultTimeServiceProviderTest { future.cancel(false); assertEquals(0, timer.getNumTasksScheduled()); + + // check that no asynchronous error was reported + if (errorRef.get() != null) { + throw new Exception(errorRef.get()); + } + } + finally { + timer.shutdownService(); + } + } + + @Test + public void testExceptionReporting() throws InterruptedException { + final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false); + final OneShotLatch latch = new OneShotLatch(); + final Object lock = new Object(); + + TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider( + new AsyncExceptionHandler() { + @Override + public void handleAsyncException(String message, Throwable exception) { + exceptionWasThrown.set(true); + latch.trigger(); + } + }, lock); + + timeServiceProvider.registerTimer(System.currentTimeMillis(), new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + throw new Exception("Exception in Timer"); + } + }); + + latch.await(); + assertTrue(exceptionWasThrown.get()); + } + + @Test + public void testTimerSorting() throws Exception { + final Object lock = new Object(); + final AtomicReference<Throwable> errorRef = new AtomicReference<>(); + + final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider( + new ReferenceSettingExceptionHandler(errorRef), lock); + + try { + final OneShotLatch sync = new OneShotLatch(); + + // we block the timer execution to make sure we have all the time + // to register some additional timers out of order + timer.registerTimer(System.currentTimeMillis(), new Triggerable() { + @Override + public void trigger(long timestamp) throws Exception { + sync.await(); + } + }); + + // schedule two timers out of order something + final long now = System.currentTimeMillis(); + final long time1 = now + 6; + final long time2 = now + 5; + final long time3 = now + 8; + final long time4 = now - 2; + + final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4); + Triggerable trigger = new Triggerable() { + @Override + public void trigger(long timestamp) { + timestamps.add(timestamp); + } + }; + + // schedule + ScheduledFuture<?> future1 = timer.registerTimer(time1, trigger); + ScheduledFuture<?> future2 = timer.registerTimer(time2, trigger); + ScheduledFuture<?> future3 = timer.registerTimer(time3, trigger); + ScheduledFuture<?> future4 = timer.registerTimer(time4, trigger); + + // now that everything is scheduled, unblock the timer service + sync.trigger(); + + // wait until both are complete + future1.get(); + future2.get(); + future3.get(); + future4.get(); + + // verify that the order is 4 - 2 - 1 - 3 + assertEquals(4, timestamps.size()); + assertEquals(time4, timestamps.take().longValue()); + assertEquals(time2, timestamps.take().longValue()); + assertEquals(time1, timestamps.take().longValue()); + assertEquals(time3, timestamps.take().longValue()); + + // check that no asynchronous error was reported + if (errorRef.get() != null) { + throw new Exception(errorRef.get()); + } } finally { timer.shutdownService();
