Repository: reef
Updated Branches:
  refs/heads/master addd62328 -> 65649348c


[REEF-1533] Make RuntimeClock more testable and create unit tests to check 
graceful and forceful shutdown of RuntimeClock

  * Refactor ClockTest for readability and add more Javadoc comments;
  * Add unit tests for graceful and forceful shutdown of the clock.
  * Move alarm producing functionality into a separate class and factor out the 
delay logic as an abstract method.
  * Add helper functions to generate random delays to AlarmProducer
  * Move EventRecorder and AlarmProducer into a separate package
  * Refactor the EventRecorder helper class - do not track timestamps 
separately from events + minor style cleanups.
  * Implement Clock.isClosed() method
  * Minor fixes in other unit tests around the RuntimeClock logic.
  * Log all event subscriptions at the beginning of hte event loop in 
RuntimeClock
  * Switch to RealTimer instead of LogicalTimer in ClockTest.testAlarmOrder() 
to avoid race condition (will be addressed in JIRA 
[REEF-1537](https://issues.apache.org/jira/browse/REEF-1537))

JIRA:
  [REEF-1533](https://issues.apache.org/jira/browse/REEF-1533) close

Pull request:
  Closes [#1099](https://github.com/apache/reef/pull/1099)


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/65649348
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/65649348
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/65649348

Branch: refs/heads/master
Commit: 65649348ceceb5630e4c09c8237ab6a2640bd88b
Parents: addd623
Author: Sergiy Matusevych <[email protected]>
Authored: Fri Aug 12 14:07:04 2016 -0700
Committer: Boris Shulman <[email protected]>
Committed: Fri Aug 26 15:03:50 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/reef/wake/time/Clock.java   |  18 +-
 .../reef/wake/time/runtime/RuntimeClock.java    |  25 +-
 .../apache/reef/wake/test/time/ClockTest.java   | 262 -----------------
 .../reef/wake/test/time/RuntimeClockTest.java   | 293 +++++++++++++++++++
 .../reef/wake/test/time/util/AlarmProducer.java |  90 ++++++
 .../reef/wake/test/time/util/EventRecorder.java |  84 ++++++
 .../reef/wake/test/time/util/package-info.java  |  22 ++
 7 files changed, 523 insertions(+), 271 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java
index ac7e86e..966e30f 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Clock.java
@@ -43,12 +43,12 @@ public interface Clock extends Runnable, AutoCloseable {
 
   /**
    * Schedule a TimerEvent at the given future offset.
-   *
-   * @param handler to be called
-   * @param offset  into the future
-   * @throws IllegalStateException when the clock has been already closed
+   * @param handler Event handler to be called on alarm.
+   * @param offset Offset into the future in milliseconds.
+   * @return Newly scheduled alarm.
+   * @throws IllegalStateException When the clock has been already closed.
    */
-  void scheduleAlarm(final int offset, final EventHandler<Alarm> handler);
+  Time scheduleAlarm(final int offset, final EventHandler<Alarm> handler);
 
   /**
    * This will stop the clock after all client alarms
@@ -78,6 +78,14 @@ public interface Clock extends Runnable, AutoCloseable {
   boolean isIdle();
 
   /**
+   * Clock is closed after a call to stop() or close().
+   * A closed clock cannot add new alarms to the schedule, but, in case of the
+   * graceful shutdown, can still invoke previously scheduled ones.
+   * @return true if closed, false otherwise.
+   */
+  boolean isClosed();
+
+  /**
    * Bind this to an event handler to statically subscribe to the StartTime 
Event.
    */
   @NamedParameter(default_class = MissingStartHandlerHandler.class, doc = 
"Will be called upon the start event")

http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
index 43a25ac..64ae63a 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java
@@ -115,10 +115,11 @@ public final class RuntimeClock implements Clock {
    * and supply an event handler to be called at that time.
    * @param offset Number of milliseconds into the future relative to current 
time.
    * @param handler Event handler to be invoked.
+   * @return Newly scheduled alarm.
    * @throws IllegalStateException if the clock is already closed.
    */
   @Override
-  public void scheduleAlarm(final int offset, final EventHandler<Alarm> 
handler) {
+  public Time scheduleAlarm(final int offset, final EventHandler<Alarm> 
handler) {
 
     final Time alarm = new ClientAlarm(this.timer.getCurrent() + offset, 
handler);
 
@@ -140,8 +141,10 @@ public final class RuntimeClock implements Clock {
       ++this.numClientAlarms;
 
       this.schedule.add(alarm);
-      this.schedule.notifyAll();
+      this.schedule.notify();
     }
+
+    return alarm;
   }
 
   /**
@@ -182,7 +185,7 @@ public final class RuntimeClock implements Clock {
 
       this.schedule.clear();
       this.schedule.add(stopEvent);
-      this.schedule.notifyAll();
+      this.schedule.notify();
     }
 
     LOG.exiting(CLASS_NAME, "stop");
@@ -209,7 +212,7 @@ public final class RuntimeClock implements Clock {
       LOG.log(Level.FINE, "Graceful shutdown scheduled: {0}", stopEvent);
 
       this.schedule.add(stopEvent);
-      this.schedule.notifyAll();
+      this.schedule.notify();
     }
 
     LOG.exiting(CLASS_NAME, "close");
@@ -228,6 +231,19 @@ public final class RuntimeClock implements Clock {
   }
 
   /**
+   * The clock is closed after a call to stop() or close().
+   * A closed clock cannot add new alarms to the schedule, but, in case of the
+   * graceful shutdown, can still invoke previously scheduled ones.
+   * @return true if closed, false otherwise.
+   */
+  @Override
+  public boolean isClosed() {
+    synchronized (this.schedule) {
+      return this.isClosed;
+    }
+  }
+
+  /**
    * Register event handlers for the given event class.
    * @param eventClass Event type to handle. Must be derived from Time.
    * @param handlers One or many event handlers that can process given event 
type.
@@ -236,6 +252,7 @@ public final class RuntimeClock implements Clock {
   @SuppressWarnings("checkstyle:hiddenfield")
   private <T extends Time> void subscribe(final Class<T> eventClass, final 
Set<EventHandler<T>> handlers) {
     for (final EventHandler<T> handler : handlers) {
+      LOG.log(Level.FINEST, "Subscribe: event {0} handler {1}", new Object[] 
{eventClass.getName(), handler});
       this.handlers.subscribe(eventClass, handler);
     }
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java
 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java
deleted file mode 100644
index afb6866..0000000
--- 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.wake.test.time;
-
-import org.apache.reef.tang.Configuration;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.impl.LoggingUtils;
-import org.apache.reef.wake.impl.ThreadPoolStage;
-import org.apache.reef.wake.time.Time;
-import org.apache.reef.wake.time.event.Alarm;
-import org.apache.reef.wake.time.runtime.LogicalTimer;
-import org.apache.reef.wake.time.runtime.RealTimer;
-import org.apache.reef.wake.time.runtime.RuntimeClock;
-import org.apache.reef.wake.time.runtime.Timer;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-
-/**
- * Tests for Clock.
- */
-public class ClockTest {
-
-  private static final Tang TANG = Tang.Factory.getTang();
-
-  private static RuntimeClock buildClock(
-      final Class<? extends Timer> timerClass) throws InjectionException {
-
-    final Configuration clockConfig = TANG.newConfigurationBuilder()
-        .bind(Timer.class, timerClass)
-        .build();
-
-    return TANG.newInjector(clockConfig).getInstance(RuntimeClock.class);
-  }
-
-  @Test
-  public void testClock() throws Exception {
-
-    LoggingUtils.setLoggingLevel(Level.FINEST);
-
-    final int minEvents = 40;
-    final CountDownLatch eventCountLatch = new CountDownLatch(minEvents);
-
-    try (final RuntimeClock clock = buildClock(RealTimer.class)) {
-
-      new Thread(clock).start();
-
-      final RandomAlarmProducer alarmProducer = new RandomAlarmProducer(clock, 
eventCountLatch);
-
-      try (ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(alarmProducer, 
10)) {
-        stage.onNext(null);
-        Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
-      }
-    }
-  }
-
-  @Test
-  public void testAlarmRegistrationRaceConditions() throws Exception {
-
-    LoggingUtils.setLoggingLevel(Level.FINEST);
-
-    try (final RuntimeClock clock = buildClock(RealTimer.class)) {
-
-      new Thread(clock).start();
-
-      final EventRecorder earlierAlarmRecorder = new EventRecorder();
-      final EventRecorder laterAlarmRecorder = new EventRecorder();
-
-      // Schedule an Alarm that's far in the future
-      clock.scheduleAlarm(5000, laterAlarmRecorder);
-      Thread.sleep(1000);
-
-      // By now, RuntimeClockImpl should be in a timed wait() for 5000 ms.
-      // Scheduler an Alarm that should fire before the existing Alarm:
-      clock.scheduleAlarm(2000, earlierAlarmRecorder);
-      Thread.sleep(1000);
-
-      // The earlier Alarm shouldn't have fired yet (we've only slept 1/2 
time):
-      Assert.assertEquals(0, earlierAlarmRecorder.getEventCount());
-      Thread.sleep(1500);
-
-      // The earlier Alarm should have fired, since 3500 > 2000 ms have passed:
-      Assert.assertEquals(1, earlierAlarmRecorder.getEventCount());
-      // And the later Alarm shouldn't have fired yet:
-      Assert.assertEquals(0, laterAlarmRecorder.getEventCount());
-      Thread.sleep(2500);
-
-      // The later Alarm should have fired, since 6000 > 5000 ms have passed:
-      Assert.assertEquals(1, laterAlarmRecorder.getEventCount());
-    }
-  }
-
-  @Test
-  public void testMultipleCloseCalls() throws Exception {
-
-    LoggingUtils.setLoggingLevel(Level.FINEST);
-
-    final int numThreads = 3;
-    final CountDownLatch eventCountLatch = new CountDownLatch(numThreads);
-
-    try (final RuntimeClock clock = buildClock(RealTimer.class)) {
-
-      final EventHandler<Alarm> handler = new EventHandler<Alarm>() {
-        @Override
-        public void onNext(final Alarm value) {
-          clock.close();
-          eventCountLatch.countDown();
-        }
-      };
-
-      new Thread(clock).start();
-
-      try (final ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(handler, 
numThreads)) {
-
-        for (int i = 0; i < numThreads; ++i) {
-          stage.onNext(null);
-        }
-
-        Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
-      }
-    }
-  }
-
-  @Test
-  public void testSimultaneousAlarms() throws Exception {
-
-    LoggingUtils.setLoggingLevel(Level.FINEST);
-
-    final int expectedEvent = 2;
-    final CountDownLatch eventCountLatch = new CountDownLatch(expectedEvent);
-
-    try (final RuntimeClock clock = buildClock(LogicalTimer.class)) {
-
-      new Thread(clock).start();
-
-      final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
-
-      clock.scheduleAlarm(500, alarmRecorder);
-      clock.scheduleAlarm(500, alarmRecorder);
-
-      eventCountLatch.await(10, TimeUnit.SECONDS);
-
-      Assert.assertEquals(expectedEvent, alarmRecorder.getEventCount());
-    }
-  }
-
-  @Test
-  public void testAlarmOrder() throws Exception {
-
-    LoggingUtils.setLoggingLevel(Level.FINEST);
-
-    final int numAlarms = 10;
-    final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms);
-
-    try (final RuntimeClock clock = buildClock(LogicalTimer.class)) {
-
-      new Thread(clock).start();
-
-      final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
-
-      final long[] expected = new long[numAlarms];
-      for (int i = 0; i < numAlarms; ++i) {
-        clock.scheduleAlarm(i * 100, alarmRecorder);
-        expected[i] = i * 100;
-      }
-
-      eventCountLatch.await(10, TimeUnit.SECONDS);
-
-      int i = 0;
-      final long[] actual = new long[numAlarms];
-      for (final long ts : alarmRecorder.getTimestamps()) {
-        actual[i++] = ts;
-      }
-
-      Assert.assertArrayEquals(expected, actual);
-    }
-  }
-
-  /**
-   * An EventHandler that records the events that it sees.
-   */
-  private static class EventRecorder implements EventHandler<Alarm> {
-
-    /**
-     * A synchronized List of the events recorded by this EventRecorder.
-     */
-    private final List<Time> events = Collections.synchronizedList(new 
ArrayList<Time>());
-    private final List<Long> timestamps = Collections.synchronizedList(new 
ArrayList<Long>());
-
-    private final CountDownLatch eventCountLatch;
-
-    EventRecorder() {
-      this(null);
-    }
-
-    EventRecorder(final CountDownLatch latch) {
-      eventCountLatch = latch;
-    }
-
-    public int getEventCount() {
-      return events.size();
-    }
-
-    public List<Long> getTimestamps() {
-      return timestamps;
-    }
-
-    @Override
-    public void onNext(final Alarm event) {
-      timestamps.add(event.getTimestamp());
-      events.add(event);
-      if (eventCountLatch != null) {
-        eventCountLatch.countDown();
-      }
-    }
-  }
-
-  private static class RandomAlarmProducer implements EventHandler<Alarm> {
-
-    private final RuntimeClock clock;
-    private final CountDownLatch eventCountLatch;
-    private final Random rand;
-
-    RandomAlarmProducer(final RuntimeClock clock, final CountDownLatch latch) {
-      this.clock = clock;
-      this.eventCountLatch = latch;
-      this.rand = new Random();
-    }
-
-    @Override
-    public void onNext(final Alarm value) {
-      final int duration = rand.nextInt(100) + 1;
-      clock.scheduleAlarm(duration, this);
-      eventCountLatch.countDown();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/RuntimeClockTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/RuntimeClockTest.java
 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/RuntimeClockTest.java
new file mode 100644
index 0000000..d3b2b11
--- /dev/null
+++ 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/RuntimeClockTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.test.time;
+
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.impl.LoggingUtils;
+import org.apache.reef.wake.impl.ThreadPoolStage;
+import org.apache.reef.wake.test.time.util.AlarmProducer;
+import org.apache.reef.wake.test.time.util.EventRecorder;
+import org.apache.reef.wake.time.Time;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.runtime.LogicalTimer;
+import org.apache.reef.wake.time.runtime.RealTimer;
+import org.apache.reef.wake.time.runtime.RuntimeClock;
+import org.apache.reef.wake.time.runtime.Timer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+
+/**
+ * Tests for RuntimeClock event loop.
+ */
+public class RuntimeClockTest {
+
+  private static final Tang TANG = Tang.Factory.getTang();
+
+  private final Random rand = new Random();
+
+  /**
+   * Create new RuntimeClock object injected with the given timer.
+   *
+   * @param timerClass Timer to use inside the RuntimeClock. Must implement 
the Timer interface.
+   * @return A new instance of the RuntimeClock, instrumented with the given 
timer.
+   * @throws InjectionException On configuration error.
+   */
+  private static RuntimeClock buildClock(
+      final Class<? extends Timer> timerClass) throws InjectionException {
+
+    final Configuration clockConfig = TANG.newConfigurationBuilder()
+        .bind(Timer.class, timerClass)
+        .build();
+
+    return TANG.newInjector(clockConfig).getInstance(RuntimeClock.class);
+  }
+
+  /**
+   * Create 10 threads to produce 40 alarms at random intervals
+   * and check if all alarms get processed.
+   * @throws Exception ThreadPoolStage can throw anything.
+   */
+  @Test
+  public void testClock() throws Exception {
+
+    LoggingUtils.setLoggingLevel(Level.FINEST);
+
+    try (final RuntimeClock clock = buildClock(RealTimer.class)) {
+
+      new Thread(clock).start();
+
+      final CountDownLatch eventCountLatch = new CountDownLatch(40);
+      final AlarmProducer alarmProducer = new AlarmProducer(clock, 
eventCountLatch) {
+        @Override
+        public int getOffset() {
+          return randomOffsetUniform(rand, 1, 100);
+        }
+      };
+
+      try (ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(alarmProducer, 
10)) {
+        stage.onNext(null);
+        Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
+      }
+    }
+  }
+
+  @Test
+  public void testAlarmRegistrationRaceConditions() throws Exception {
+
+    LoggingUtils.setLoggingLevel(Level.FINEST);
+
+    try (final RuntimeClock clock = buildClock(RealTimer.class)) {
+
+      new Thread(clock).start();
+
+      final EventRecorder earlierAlarmRecorder = new EventRecorder();
+      final EventRecorder laterAlarmRecorder = new EventRecorder();
+
+      // Schedule an Alarm that's far in the future
+      clock.scheduleAlarm(5000, laterAlarmRecorder);
+      Thread.sleep(1000);
+
+      // By now, RuntimeClockImpl should be in a timed wait() for 5000 ms.
+      // Scheduler an Alarm that should fire before the existing Alarm:
+      clock.scheduleAlarm(2000, earlierAlarmRecorder);
+      Thread.sleep(1000);
+
+      // The earlier Alarm shouldn't have fired yet (we've only slept 1/2 
time):
+      Assert.assertEquals(0, earlierAlarmRecorder.getEventCount());
+      Thread.sleep(1500);
+
+      // The earlier Alarm should have fired, since 3500 > 2000 ms have passed:
+      Assert.assertEquals(1, earlierAlarmRecorder.getEventCount());
+      // And the later Alarm shouldn't have fired yet:
+      Assert.assertEquals(0, laterAlarmRecorder.getEventCount());
+      Thread.sleep(2500);
+
+      // The later Alarm should have fired, since 6000 > 5000 ms have passed:
+      Assert.assertEquals(1, laterAlarmRecorder.getEventCount());
+    }
+  }
+
+  @Test
+  public void testMultipleCloseCalls() throws Exception {
+
+    LoggingUtils.setLoggingLevel(Level.FINEST);
+
+    final int numThreads = 3;
+    final CountDownLatch eventCountLatch = new CountDownLatch(numThreads);
+
+    try (final RuntimeClock clock = buildClock(RealTimer.class)) {
+
+      final EventHandler<Alarm> handler = new EventHandler<Alarm>() {
+        @Override
+        public void onNext(final Alarm value) {
+          clock.close();
+          eventCountLatch.countDown();
+        }
+      };
+
+      new Thread(clock).start();
+
+      try (final ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(handler, 
numThreads)) {
+
+        for (int i = 0; i < numThreads; ++i) {
+          stage.onNext(null);
+        }
+
+        Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS));
+      }
+    }
+  }
+
+  @Test
+  public void testSimultaneousAlarms() throws Exception {
+
+    LoggingUtils.setLoggingLevel(Level.FINEST);
+
+    final int expectedEvent = 2;
+    final CountDownLatch eventCountLatch = new CountDownLatch(expectedEvent);
+
+    try (final RuntimeClock clock = buildClock(LogicalTimer.class)) {
+
+      new Thread(clock).start();
+
+      final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
+
+      clock.scheduleAlarm(500, alarmRecorder);
+      clock.scheduleAlarm(500, alarmRecorder);
+
+      eventCountLatch.await(10, TimeUnit.SECONDS);
+
+      Assert.assertEquals(expectedEvent, alarmRecorder.getEventCount());
+    }
+  }
+
+  @Test
+  public void testAlarmOrder() throws Exception {
+
+    LoggingUtils.setLoggingLevel(Level.FINEST);
+
+    final int numAlarms = 10;
+    final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms);
+    final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
+
+    final long[] expected = new long[numAlarms];
+
+    try (final RuntimeClock clock = buildClock(RealTimer.class)) {
+
+      new Thread(clock).start();
+
+      for (int i = 0; i < numAlarms; ++i) {
+        final Time event = clock.scheduleAlarm(i * 100, alarmRecorder);
+        expected[i] = event.getTimestamp();
+      }
+    }
+
+    eventCountLatch.await(10, TimeUnit.SECONDS);
+
+    int i = 0;
+    final long[] actual = new long[numAlarms];
+    for (final Time event : alarmRecorder.getEvents()) {
+      actual[i++] = event.getTimestamp();
+    }
+
+    Assert.assertEquals(
+        "Number of alarms does not match the expected count",
+        numAlarms, alarmRecorder.getEventCount());
+
+    Assert.assertArrayEquals("Alarms processed in the wrong order", expected, 
actual);
+  }
+
+  /**
+   * Test graceful shutdown of the event loop.
+   * Schedule two events and close the clock. Make sure that no events occur 
soon after
+   * closing the alarm and both of them occur at the scheduled time. Check 
that the clock
+   * is closed after that.
+   * @throws InjectionException Error building a runtime clock object.
+   * @throws InterruptedException Sleep interrupted.
+   */
+  @Test
+  public void testGracefulClose() throws InjectionException, 
InterruptedException {
+
+    LoggingUtils.setLoggingLevel(Level.FINEST);
+
+    final int numAlarms = 2;
+    final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms);
+    final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
+
+    final RuntimeClock clock = buildClock(RealTimer.class);
+    new Thread(clock).start();
+
+    clock.scheduleAlarm(100, alarmRecorder);
+    clock.scheduleAlarm(101, alarmRecorder);
+    clock.close();
+
+    Assert.assertFalse("Clock cannot be idle yet", clock.isIdle());
+    Assert.assertTrue("Clock must be in closed state", clock.isClosed());
+
+    Thread.sleep(10);
+    Assert.assertTrue(
+        "No events should occur immediately after the graceful shutdown",
+        alarmRecorder.getEvents().isEmpty());
+
+    Thread.sleep(200);
+    final List<Time> events = alarmRecorder.getEvents();
+    Assert.assertEquals("Expected events on graceful shutdown", events.size(), 
numAlarms);
+
+    Assert.assertTrue("No client alarms should be scheduled at this time", 
clock.isIdle());
+    Assert.assertTrue("Clock must be in closed state", clock.isClosed());
+  }
+
+  /**
+   * Test forceful shutdown of the event loop. Schedule two events and close 
the clock.
+   * Make sure that no events occur after that and the clock is in closed and 
idle state.
+   * @throws InjectionException Error building a runtime clock object.
+   * @throws InterruptedException Sleep interrupted.
+   */
+  @Test
+  public void testForcefulStop() throws InjectionException, 
InterruptedException {
+
+    LoggingUtils.setLoggingLevel(Level.FINEST);
+
+    final int numAlarms = 2;
+    final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms);
+    final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch);
+
+    final RuntimeClock clock = buildClock(RealTimer.class);
+    new Thread(clock).start();
+
+    clock.scheduleAlarm(100, alarmRecorder);
+    clock.scheduleAlarm(101, alarmRecorder);
+    clock.stop();
+
+    Assert.assertTrue("Clock must be idle already", clock.isIdle());
+    Assert.assertTrue("Clock must be in closed state", clock.isClosed());
+
+    Thread.sleep(200);
+    Assert.assertTrue("No events should be in the schedule", 
alarmRecorder.getEvents().isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/AlarmProducer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/AlarmProducer.java
 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/AlarmProducer.java
new file mode 100644
index 0000000..966d039
--- /dev/null
+++ 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/AlarmProducer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.test.time.util;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.event.Alarm;
+import org.apache.reef.wake.time.runtime.RuntimeClock;
+
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Helper class used in unit tests to generate alarms at user-specified 
intervals
+ * and count down the barrier on each alarm. It is used in RuntimeClockTest.
+ */
+public abstract class AlarmProducer implements EventHandler<Alarm> {
+
+  private final RuntimeClock clock;
+  private final CountDownLatch eventCountLatch;
+
+  /**
+   * Create a new alarm producer.
+   * @param clock Event loop that processes the schedule and invokes alarm 
handlers.
+   * @param latch A barrier with the counter that gets decremented after each 
alarm.
+   */
+  public AlarmProducer(final RuntimeClock clock, final CountDownLatch latch) {
+    this.clock = clock;
+    this.eventCountLatch = latch;
+  }
+
+  /**
+   * On each alarm, schedule the next one and decrement the latch.
+   * @param value An alarm to process.
+   */
+  @Override
+  public void onNext(final Alarm value) {
+    this.clock.scheduleAlarm(this.getOffset(), this);
+    this.eventCountLatch.countDown();
+  }
+
+  /**
+   * Return offset from current time in milliseconds for when to schedule an 
alarm.
+   * @return Time offset in milliseconds. Must be a positive number.
+   */
+  public abstract int getOffset();
+
+  /**
+   * Generate random integer uniformly distributed between offsetMin and 
offsetMax.
+   * Helper function to be used in getOffset().
+   * @param rand Random number generator, and instance of Random.
+   * @param offsetMin Lower bound, must be > 0.
+   * @param offsetMax Upper bound, must be > offsetMin.
+   * @return Random integer uniformly distributed between offsetMin and 
offsetMax.
+   */
+  public static int randomOffsetUniform(final Random rand, final int 
offsetMin, final int offsetMax) {
+    assert offsetMin > 0;
+    assert offsetMin <= offsetMax;
+    return rand.nextInt(offsetMax - offsetMin + 1) + offsetMin;
+  }
+
+  /**
+   * Generate random integer drawn from Poisson distribution.
+   * Helper function to be used in getOffset(). Always returns a positive 
integer.
+   * We use normal distribution with mu=lambda and sigma=sqrt(lambda) to 
approximate
+   * the Poisson distribution.
+   * @param rand Random number generator, and instance of Random.
+   * @param lambda Parameter of the Poisson distribution, must be > 0.
+   * @return A rounded absolute value of a random number drawn from a Poisson 
distribution.
+   */
+  public static int randomOffsetPoisson(final Random rand, final double 
lambda) {
+    assert lambda > 0;
+    return (int)Math.round(Math.abs(rand.nextGaussian() * Math.sqrt(lambda) + 
lambda) + 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/EventRecorder.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/EventRecorder.java
 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/EventRecorder.java
new file mode 100644
index 0000000..0e63599
--- /dev/null
+++ 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/EventRecorder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.test.time.util;
+
+import org.apache.reef.wake.EventHandler;
+import org.apache.reef.wake.time.Time;
+import org.apache.reef.wake.time.event.Alarm;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * An EventHandler that records the events that it sees.
+ * Can optionally count down the latch on each alarm it receives.
+ * This is a helper class to be used in unit tests, e.g. RuntimeClockTest.
+ */
+public final class EventRecorder implements EventHandler<Alarm> {
+
+  /**
+   * A synchronized List of the events recorded by this EventRecorder.
+   */
+  private final List<Time> events = Collections.synchronizedList(new 
ArrayList<Time>());
+
+  private final CountDownLatch eventCountLatch;
+
+  public EventRecorder() {
+    this(null);
+  }
+
+  /**
+   * Create a new event recorder.
+   * If latch is not null, count it down on each event received.
+   * @param latch A count down latch. Can be null.
+   */
+  public EventRecorder(final CountDownLatch latch) {
+    this.eventCountLatch = latch;
+  }
+
+  /**
+   * Get the number of events captured so far.
+   * @return A number of events captured.
+   */
+  public int getEventCount() {
+    return this.events.size();
+  }
+
+  /**
+   * Get the list of events captured so far, in the order they were captured.
+   * @return A list of events. It can be empty, but never null.
+   */
+  public List<Time> getEvents() {
+    return this.events;
+  }
+
+  /**
+   * Add a new event to the list.
+   * @param event An event to capture.
+   */
+  @Override
+  public void onNext(final Alarm event) {
+    this.events.add(event);
+    if (this.eventCountLatch != null) {
+      this.eventCountLatch.countDown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/65649348/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/package-info.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/package-info.java
 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/package-info.java
new file mode 100644
index 0000000..6677e2e
--- /dev/null
+++ 
b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Helper classes for unit tests in org.apache.reef.wake.test.time package.
+ */
+package org.apache.reef.wake.test.time.util;

Reply via email to