Repository: reef Updated Branches: refs/heads/master addd62328 -> 65649348c
[REEF-1533] Make RuntimeClock more testable and create unit tests to check graceful and forceful shutdown of RuntimeClock * Refactor ClockTest for readability and add more Javadoc comments; * Add unit tests for graceful and forceful shutdown of the clock. * Move alarm producing functionality into a separate class and factor out the delay logic as an abstract method. * Add helper functions to generate random delays to AlarmProducer * Move EventRecorder and AlarmProducer into a separate package * Refactor the EventRecorder helper class - do not track timestamps separately from events + minor style cleanups. * Implement Clock.isClosed() method * Minor fixes in other unit tests around the RuntimeClock logic. * Log all event subscriptions at the beginning of hte event loop in RuntimeClock * Switch to RealTimer instead of LogicalTimer in ClockTest.testAlarmOrder() to avoid race condition (will be addressed in JIRA [REEF-1537](https://issues.apache.org/jira/browse/REEF-1537)) JIRA: [REEF-1533](https://issues.apache.org/jira/browse/REEF-1533) close Pull request: Closes [#1099](https://github.com/apache/reef/pull/1099) Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/65649348 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/65649348 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/65649348 Branch: refs/heads/master Commit: 65649348ceceb5630e4c09c8237ab6a2640bd88b Parents: addd623 Author: Sergiy Matusevych <[email protected]> Authored: Fri Aug 12 14:07:04 2016 -0700 Committer: Boris Shulman <[email protected]> Committed: Fri Aug 26 15:03:50 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/reef/wake/time/Clock.java | 18 +- .../reef/wake/time/runtime/RuntimeClock.java | 25 +- .../apache/reef/wake/test/time/ClockTest.java | 262 ----------------- .../reef/wake/test/time/RuntimeClockTest.java | 293 +++++++++++++++++++ .../reef/wake/test/time/util/AlarmProducer.java | 90 ++++++ .../reef/wake/test/time/util/EventRecorder.java | 84 ++++++ .../reef/wake/test/time/util/package-info.java | 22 ++ 7 files changed, 523 insertions(+), 271 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java index ac7e86e..966e30f 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java @@ -43,12 +43,12 @@ public interface Clock extends Runnable, AutoCloseable { /** * Schedule a TimerEvent at the given future offset. - * - * @param handler to be called - * @param offset into the future - * @throws IllegalStateException when the clock has been already closed + * @param handler Event handler to be called on alarm. + * @param offset Offset into the future in milliseconds. + * @return Newly scheduled alarm. + * @throws IllegalStateException When the clock has been already closed. */ - void scheduleAlarm(final int offset, final EventHandler<Alarm> handler); + Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler); /** * This will stop the clock after all client alarms @@ -78,6 +78,14 @@ public interface Clock extends Runnable, AutoCloseable { boolean isIdle(); /** + * Clock is closed after a call to stop() or close(). + * A closed clock cannot add new alarms to the schedule, but, in case of the + * graceful shutdown, can still invoke previously scheduled ones. + * @return true if closed, false otherwise. + */ + boolean isClosed(); + + /** * Bind this to an event handler to statically subscribe to the StartTime Event. */ @NamedParameter(default_class = MissingStartHandlerHandler.class, doc = "Will be called upon the start event") http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java index 43a25ac..64ae63a 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java @@ -115,10 +115,11 @@ public final class RuntimeClock implements Clock { * and supply an event handler to be called at that time. * @param offset Number of milliseconds into the future relative to current time. * @param handler Event handler to be invoked. + * @return Newly scheduled alarm. * @throws IllegalStateException if the clock is already closed. */ @Override - public void scheduleAlarm(final int offset, final EventHandler<Alarm> handler) { + public Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler) { final Time alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler); @@ -140,8 +141,10 @@ public final class RuntimeClock implements Clock { ++this.numClientAlarms; this.schedule.add(alarm); - this.schedule.notifyAll(); + this.schedule.notify(); } + + return alarm; } /** @@ -182,7 +185,7 @@ public final class RuntimeClock implements Clock { this.schedule.clear(); this.schedule.add(stopEvent); - this.schedule.notifyAll(); + this.schedule.notify(); } LOG.exiting(CLASS_NAME, "stop"); @@ -209,7 +212,7 @@ public final class RuntimeClock implements Clock { LOG.log(Level.FINE, "Graceful shutdown scheduled: {0}", stopEvent); this.schedule.add(stopEvent); - this.schedule.notifyAll(); + this.schedule.notify(); } LOG.exiting(CLASS_NAME, "close"); @@ -228,6 +231,19 @@ public final class RuntimeClock implements Clock { } /** + * The clock is closed after a call to stop() or close(). + * A closed clock cannot add new alarms to the schedule, but, in case of the + * graceful shutdown, can still invoke previously scheduled ones. + * @return true if closed, false otherwise. + */ + @Override + public boolean isClosed() { + synchronized (this.schedule) { + return this.isClosed; + } + } + + /** * Register event handlers for the given event class. * @param eventClass Event type to handle. Must be derived from Time. * @param handlers One or many event handlers that can process given event type. @@ -236,6 +252,7 @@ public final class RuntimeClock implements Clock { @SuppressWarnings("checkstyle:hiddenfield") private <T extends Time> void subscribe(final Class<T> eventClass, final Set<EventHandler<T>> handlers) { for (final EventHandler<T> handler : handlers) { + LOG.log(Level.FINEST, "Subscribe: event {0} handler {1}", new Object[] {eventClass.getName(), handler}); this.handlers.subscribe(eventClass, handler); } } http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java deleted file mode 100644 index afb6866..0000000 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.wake.test.time; - -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.exceptions.InjectionException; -import org.apache.reef.wake.EventHandler; -import org.apache.reef.wake.impl.LoggingUtils; -import org.apache.reef.wake.impl.ThreadPoolStage; -import org.apache.reef.wake.time.Time; -import org.apache.reef.wake.time.event.Alarm; -import org.apache.reef.wake.time.runtime.LogicalTimer; -import org.apache.reef.wake.time.runtime.RealTimer; -import org.apache.reef.wake.time.runtime.RuntimeClock; -import org.apache.reef.wake.time.runtime.Timer; -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; - -/** - * Tests for Clock. - */ -public class ClockTest { - - private static final Tang TANG = Tang.Factory.getTang(); - - private static RuntimeClock buildClock( - final Class<? extends Timer> timerClass) throws InjectionException { - - final Configuration clockConfig = TANG.newConfigurationBuilder() - .bind(Timer.class, timerClass) - .build(); - - return TANG.newInjector(clockConfig).getInstance(RuntimeClock.class); - } - - @Test - public void testClock() throws Exception { - - LoggingUtils.setLoggingLevel(Level.FINEST); - - final int minEvents = 40; - final CountDownLatch eventCountLatch = new CountDownLatch(minEvents); - - try (final RuntimeClock clock = buildClock(RealTimer.class)) { - - new Thread(clock).start(); - - final RandomAlarmProducer alarmProducer = new RandomAlarmProducer(clock, eventCountLatch); - - try (ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(alarmProducer, 10)) { - stage.onNext(null); - Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS)); - } - } - } - - @Test - public void testAlarmRegistrationRaceConditions() throws Exception { - - LoggingUtils.setLoggingLevel(Level.FINEST); - - try (final RuntimeClock clock = buildClock(RealTimer.class)) { - - new Thread(clock).start(); - - final EventRecorder earlierAlarmRecorder = new EventRecorder(); - final EventRecorder laterAlarmRecorder = new EventRecorder(); - - // Schedule an Alarm that's far in the future - clock.scheduleAlarm(5000, laterAlarmRecorder); - Thread.sleep(1000); - - // By now, RuntimeClockImpl should be in a timed wait() for 5000 ms. - // Scheduler an Alarm that should fire before the existing Alarm: - clock.scheduleAlarm(2000, earlierAlarmRecorder); - Thread.sleep(1000); - - // The earlier Alarm shouldn't have fired yet (we've only slept 1/2 time): - Assert.assertEquals(0, earlierAlarmRecorder.getEventCount()); - Thread.sleep(1500); - - // The earlier Alarm should have fired, since 3500 > 2000 ms have passed: - Assert.assertEquals(1, earlierAlarmRecorder.getEventCount()); - // And the later Alarm shouldn't have fired yet: - Assert.assertEquals(0, laterAlarmRecorder.getEventCount()); - Thread.sleep(2500); - - // The later Alarm should have fired, since 6000 > 5000 ms have passed: - Assert.assertEquals(1, laterAlarmRecorder.getEventCount()); - } - } - - @Test - public void testMultipleCloseCalls() throws Exception { - - LoggingUtils.setLoggingLevel(Level.FINEST); - - final int numThreads = 3; - final CountDownLatch eventCountLatch = new CountDownLatch(numThreads); - - try (final RuntimeClock clock = buildClock(RealTimer.class)) { - - final EventHandler<Alarm> handler = new EventHandler<Alarm>() { - @Override - public void onNext(final Alarm value) { - clock.close(); - eventCountLatch.countDown(); - } - }; - - new Thread(clock).start(); - - try (final ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(handler, numThreads)) { - - for (int i = 0; i < numThreads; ++i) { - stage.onNext(null); - } - - Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS)); - } - } - } - - @Test - public void testSimultaneousAlarms() throws Exception { - - LoggingUtils.setLoggingLevel(Level.FINEST); - - final int expectedEvent = 2; - final CountDownLatch eventCountLatch = new CountDownLatch(expectedEvent); - - try (final RuntimeClock clock = buildClock(LogicalTimer.class)) { - - new Thread(clock).start(); - - final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch); - - clock.scheduleAlarm(500, alarmRecorder); - clock.scheduleAlarm(500, alarmRecorder); - - eventCountLatch.await(10, TimeUnit.SECONDS); - - Assert.assertEquals(expectedEvent, alarmRecorder.getEventCount()); - } - } - - @Test - public void testAlarmOrder() throws Exception { - - LoggingUtils.setLoggingLevel(Level.FINEST); - - final int numAlarms = 10; - final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms); - - try (final RuntimeClock clock = buildClock(LogicalTimer.class)) { - - new Thread(clock).start(); - - final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch); - - final long[] expected = new long[numAlarms]; - for (int i = 0; i < numAlarms; ++i) { - clock.scheduleAlarm(i * 100, alarmRecorder); - expected[i] = i * 100; - } - - eventCountLatch.await(10, TimeUnit.SECONDS); - - int i = 0; - final long[] actual = new long[numAlarms]; - for (final long ts : alarmRecorder.getTimestamps()) { - actual[i++] = ts; - } - - Assert.assertArrayEquals(expected, actual); - } - } - - /** - * An EventHandler that records the events that it sees. - */ - private static class EventRecorder implements EventHandler<Alarm> { - - /** - * A synchronized List of the events recorded by this EventRecorder. - */ - private final List<Time> events = Collections.synchronizedList(new ArrayList<Time>()); - private final List<Long> timestamps = Collections.synchronizedList(new ArrayList<Long>()); - - private final CountDownLatch eventCountLatch; - - EventRecorder() { - this(null); - } - - EventRecorder(final CountDownLatch latch) { - eventCountLatch = latch; - } - - public int getEventCount() { - return events.size(); - } - - public List<Long> getTimestamps() { - return timestamps; - } - - @Override - public void onNext(final Alarm event) { - timestamps.add(event.getTimestamp()); - events.add(event); - if (eventCountLatch != null) { - eventCountLatch.countDown(); - } - } - } - - private static class RandomAlarmProducer implements EventHandler<Alarm> { - - private final RuntimeClock clock; - private final CountDownLatch eventCountLatch; - private final Random rand; - - RandomAlarmProducer(final RuntimeClock clock, final CountDownLatch latch) { - this.clock = clock; - this.eventCountLatch = latch; - this.rand = new Random(); - } - - @Override - public void onNext(final Alarm value) { - final int duration = rand.nextInt(100) + 1; - clock.scheduleAlarm(duration, this); - eventCountLatch.countDown(); - } - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/RuntimeClockTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/RuntimeClockTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/RuntimeClockTest.java new file mode 100644 index 0000000..d3b2b11 --- /dev/null +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/RuntimeClockTest.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.test.time; + +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.impl.LoggingUtils; +import org.apache.reef.wake.impl.ThreadPoolStage; +import org.apache.reef.wake.test.time.util.AlarmProducer; +import org.apache.reef.wake.test.time.util.EventRecorder; +import org.apache.reef.wake.time.Time; +import org.apache.reef.wake.time.event.Alarm; +import org.apache.reef.wake.time.runtime.LogicalTimer; +import org.apache.reef.wake.time.runtime.RealTimer; +import org.apache.reef.wake.time.runtime.RuntimeClock; +import org.apache.reef.wake.time.runtime.Timer; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +/** + * Tests for RuntimeClock event loop. + */ +public class RuntimeClockTest { + + private static final Tang TANG = Tang.Factory.getTang(); + + private final Random rand = new Random(); + + /** + * Create new RuntimeClock object injected with the given timer. + * + * @param timerClass Timer to use inside the RuntimeClock. Must implement the Timer interface. + * @return A new instance of the RuntimeClock, instrumented with the given timer. + * @throws InjectionException On configuration error. + */ + private static RuntimeClock buildClock( + final Class<? extends Timer> timerClass) throws InjectionException { + + final Configuration clockConfig = TANG.newConfigurationBuilder() + .bind(Timer.class, timerClass) + .build(); + + return TANG.newInjector(clockConfig).getInstance(RuntimeClock.class); + } + + /** + * Create 10 threads to produce 40 alarms at random intervals + * and check if all alarms get processed. + * @throws Exception ThreadPoolStage can throw anything. + */ + @Test + public void testClock() throws Exception { + + LoggingUtils.setLoggingLevel(Level.FINEST); + + try (final RuntimeClock clock = buildClock(RealTimer.class)) { + + new Thread(clock).start(); + + final CountDownLatch eventCountLatch = new CountDownLatch(40); + final AlarmProducer alarmProducer = new AlarmProducer(clock, eventCountLatch) { + @Override + public int getOffset() { + return randomOffsetUniform(rand, 1, 100); + } + }; + + try (ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(alarmProducer, 10)) { + stage.onNext(null); + Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS)); + } + } + } + + @Test + public void testAlarmRegistrationRaceConditions() throws Exception { + + LoggingUtils.setLoggingLevel(Level.FINEST); + + try (final RuntimeClock clock = buildClock(RealTimer.class)) { + + new Thread(clock).start(); + + final EventRecorder earlierAlarmRecorder = new EventRecorder(); + final EventRecorder laterAlarmRecorder = new EventRecorder(); + + // Schedule an Alarm that's far in the future + clock.scheduleAlarm(5000, laterAlarmRecorder); + Thread.sleep(1000); + + // By now, RuntimeClockImpl should be in a timed wait() for 5000 ms. + // Scheduler an Alarm that should fire before the existing Alarm: + clock.scheduleAlarm(2000, earlierAlarmRecorder); + Thread.sleep(1000); + + // The earlier Alarm shouldn't have fired yet (we've only slept 1/2 time): + Assert.assertEquals(0, earlierAlarmRecorder.getEventCount()); + Thread.sleep(1500); + + // The earlier Alarm should have fired, since 3500 > 2000 ms have passed: + Assert.assertEquals(1, earlierAlarmRecorder.getEventCount()); + // And the later Alarm shouldn't have fired yet: + Assert.assertEquals(0, laterAlarmRecorder.getEventCount()); + Thread.sleep(2500); + + // The later Alarm should have fired, since 6000 > 5000 ms have passed: + Assert.assertEquals(1, laterAlarmRecorder.getEventCount()); + } + } + + @Test + public void testMultipleCloseCalls() throws Exception { + + LoggingUtils.setLoggingLevel(Level.FINEST); + + final int numThreads = 3; + final CountDownLatch eventCountLatch = new CountDownLatch(numThreads); + + try (final RuntimeClock clock = buildClock(RealTimer.class)) { + + final EventHandler<Alarm> handler = new EventHandler<Alarm>() { + @Override + public void onNext(final Alarm value) { + clock.close(); + eventCountLatch.countDown(); + } + }; + + new Thread(clock).start(); + + try (final ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(handler, numThreads)) { + + for (int i = 0; i < numThreads; ++i) { + stage.onNext(null); + } + + Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS)); + } + } + } + + @Test + public void testSimultaneousAlarms() throws Exception { + + LoggingUtils.setLoggingLevel(Level.FINEST); + + final int expectedEvent = 2; + final CountDownLatch eventCountLatch = new CountDownLatch(expectedEvent); + + try (final RuntimeClock clock = buildClock(LogicalTimer.class)) { + + new Thread(clock).start(); + + final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch); + + clock.scheduleAlarm(500, alarmRecorder); + clock.scheduleAlarm(500, alarmRecorder); + + eventCountLatch.await(10, TimeUnit.SECONDS); + + Assert.assertEquals(expectedEvent, alarmRecorder.getEventCount()); + } + } + + @Test + public void testAlarmOrder() throws Exception { + + LoggingUtils.setLoggingLevel(Level.FINEST); + + final int numAlarms = 10; + final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms); + final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch); + + final long[] expected = new long[numAlarms]; + + try (final RuntimeClock clock = buildClock(RealTimer.class)) { + + new Thread(clock).start(); + + for (int i = 0; i < numAlarms; ++i) { + final Time event = clock.scheduleAlarm(i * 100, alarmRecorder); + expected[i] = event.getTimestamp(); + } + } + + eventCountLatch.await(10, TimeUnit.SECONDS); + + int i = 0; + final long[] actual = new long[numAlarms]; + for (final Time event : alarmRecorder.getEvents()) { + actual[i++] = event.getTimestamp(); + } + + Assert.assertEquals( + "Number of alarms does not match the expected count", + numAlarms, alarmRecorder.getEventCount()); + + Assert.assertArrayEquals("Alarms processed in the wrong order", expected, actual); + } + + /** + * Test graceful shutdown of the event loop. + * Schedule two events and close the clock. Make sure that no events occur soon after + * closing the alarm and both of them occur at the scheduled time. Check that the clock + * is closed after that. + * @throws InjectionException Error building a runtime clock object. + * @throws InterruptedException Sleep interrupted. + */ + @Test + public void testGracefulClose() throws InjectionException, InterruptedException { + + LoggingUtils.setLoggingLevel(Level.FINEST); + + final int numAlarms = 2; + final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms); + final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch); + + final RuntimeClock clock = buildClock(RealTimer.class); + new Thread(clock).start(); + + clock.scheduleAlarm(100, alarmRecorder); + clock.scheduleAlarm(101, alarmRecorder); + clock.close(); + + Assert.assertFalse("Clock cannot be idle yet", clock.isIdle()); + Assert.assertTrue("Clock must be in closed state", clock.isClosed()); + + Thread.sleep(10); + Assert.assertTrue( + "No events should occur immediately after the graceful shutdown", + alarmRecorder.getEvents().isEmpty()); + + Thread.sleep(200); + final List<Time> events = alarmRecorder.getEvents(); + Assert.assertEquals("Expected events on graceful shutdown", events.size(), numAlarms); + + Assert.assertTrue("No client alarms should be scheduled at this time", clock.isIdle()); + Assert.assertTrue("Clock must be in closed state", clock.isClosed()); + } + + /** + * Test forceful shutdown of the event loop. Schedule two events and close the clock. + * Make sure that no events occur after that and the clock is in closed and idle state. + * @throws InjectionException Error building a runtime clock object. + * @throws InterruptedException Sleep interrupted. + */ + @Test + public void testForcefulStop() throws InjectionException, InterruptedException { + + LoggingUtils.setLoggingLevel(Level.FINEST); + + final int numAlarms = 2; + final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms); + final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch); + + final RuntimeClock clock = buildClock(RealTimer.class); + new Thread(clock).start(); + + clock.scheduleAlarm(100, alarmRecorder); + clock.scheduleAlarm(101, alarmRecorder); + clock.stop(); + + Assert.assertTrue("Clock must be idle already", clock.isIdle()); + Assert.assertTrue("Clock must be in closed state", clock.isClosed()); + + Thread.sleep(200); + Assert.assertTrue("No events should be in the schedule", alarmRecorder.getEvents().isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/AlarmProducer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/AlarmProducer.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/AlarmProducer.java new file mode 100644 index 0000000..966d039 --- /dev/null +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/AlarmProducer.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.test.time.util; + +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.event.Alarm; +import org.apache.reef.wake.time.runtime.RuntimeClock; + +import java.util.Random; +import java.util.concurrent.CountDownLatch; + +/** + * Helper class used in unit tests to generate alarms at user-specified intervals + * and count down the barrier on each alarm. It is used in RuntimeClockTest. + */ +public abstract class AlarmProducer implements EventHandler<Alarm> { + + private final RuntimeClock clock; + private final CountDownLatch eventCountLatch; + + /** + * Create a new alarm producer. + * @param clock Event loop that processes the schedule and invokes alarm handlers. + * @param latch A barrier with the counter that gets decremented after each alarm. + */ + public AlarmProducer(final RuntimeClock clock, final CountDownLatch latch) { + this.clock = clock; + this.eventCountLatch = latch; + } + + /** + * On each alarm, schedule the next one and decrement the latch. + * @param value An alarm to process. + */ + @Override + public void onNext(final Alarm value) { + this.clock.scheduleAlarm(this.getOffset(), this); + this.eventCountLatch.countDown(); + } + + /** + * Return offset from current time in milliseconds for when to schedule an alarm. + * @return Time offset in milliseconds. Must be a positive number. + */ + public abstract int getOffset(); + + /** + * Generate random integer uniformly distributed between offsetMin and offsetMax. + * Helper function to be used in getOffset(). + * @param rand Random number generator, and instance of Random. + * @param offsetMin Lower bound, must be > 0. + * @param offsetMax Upper bound, must be > offsetMin. + * @return Random integer uniformly distributed between offsetMin and offsetMax. + */ + public static int randomOffsetUniform(final Random rand, final int offsetMin, final int offsetMax) { + assert offsetMin > 0; + assert offsetMin <= offsetMax; + return rand.nextInt(offsetMax - offsetMin + 1) + offsetMin; + } + + /** + * Generate random integer drawn from Poisson distribution. + * Helper function to be used in getOffset(). Always returns a positive integer. + * We use normal distribution with mu=lambda and sigma=sqrt(lambda) to approximate + * the Poisson distribution. + * @param rand Random number generator, and instance of Random. + * @param lambda Parameter of the Poisson distribution, must be > 0. + * @return A rounded absolute value of a random number drawn from a Poisson distribution. + */ + public static int randomOffsetPoisson(final Random rand, final double lambda) { + assert lambda > 0; + return (int)Math.round(Math.abs(rand.nextGaussian() * Math.sqrt(lambda) + lambda) + 1); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/EventRecorder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/EventRecorder.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/EventRecorder.java new file mode 100644 index 0000000..0e63599 --- /dev/null +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/EventRecorder.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.wake.test.time.util; + +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.Time; +import org.apache.reef.wake.time.event.Alarm; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** + * An EventHandler that records the events that it sees. + * Can optionally count down the latch on each alarm it receives. + * This is a helper class to be used in unit tests, e.g. RuntimeClockTest. + */ +public final class EventRecorder implements EventHandler<Alarm> { + + /** + * A synchronized List of the events recorded by this EventRecorder. + */ + private final List<Time> events = Collections.synchronizedList(new ArrayList<Time>()); + + private final CountDownLatch eventCountLatch; + + public EventRecorder() { + this(null); + } + + /** + * Create a new event recorder. + * If latch is not null, count it down on each event received. + * @param latch A count down latch. Can be null. + */ + public EventRecorder(final CountDownLatch latch) { + this.eventCountLatch = latch; + } + + /** + * Get the number of events captured so far. + * @return A number of events captured. + */ + public int getEventCount() { + return this.events.size(); + } + + /** + * Get the list of events captured so far, in the order they were captured. + * @return A list of events. It can be empty, but never null. + */ + public List<Time> getEvents() { + return this.events; + } + + /** + * Add a new event to the list. + * @param event An event to capture. + */ + @Override + public void onNext(final Alarm event) { + this.events.add(event); + if (this.eventCountLatch != null) { + this.eventCountLatch.countDown(); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/package-info.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/package-info.java new file mode 100644 index 0000000..6677e2e --- /dev/null +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Helper classes for unit tests in org.apache.reef.wake.test.time package. + */ +package org.apache.reef.wake.test.time.util;
