Repository: reef Updated Branches: refs/heads/master 44bb76eb7 -> 8e140c720
[REEF-1527] Refactor RuntimeClock and implement graceful shutdown correctly. * Refactor RuntimeClock for readability and add more Javadoc comments; * Fix the bug in RuntimeClock.close() that clears all scheduled events without processing them; * Clean up and refactor code for Time and derived classes, add Javadocs; * Improve logging in RuntimeClock and around; * Cosmetic refactoring of RuntimeClock unit tests (new unit tests will be in a separate pull request) * Avoid loops over entire schedule in .close() and .isIdle() methods to find client alarms. JIRA: [REEF-1527](https://issues.apache.org/jira/browse/REEF-1527) close Pull Request: This closes #1096 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/8e140c72 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/8e140c72 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/8e140c72 Branch: refs/heads/master Commit: 8e140c72083cfe7747436d84dfbe6c1a8426477f Parents: 44bb76e Author: Sergiy Matusevych <[email protected]> Authored: Fri Aug 12 14:07:04 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed Aug 17 18:48:14 2016 -0700 ---------------------------------------------------------------------- .../reef/javabridge/generic/JobDriver.java | 4 +- .../reef/runtime/common/REEFLauncher.java | 1 + .../reef/io/watcher/util/WatcherAvroUtil.java | 8 +- .../java/org/apache/reef/wake/time/Time.java | 56 ++-- .../org/apache/reef/wake/time/event/Alarm.java | 12 +- .../reef/wake/time/runtime/LogicalTimer.java | 81 ++++- .../reef/wake/time/runtime/RealTimer.java | 53 +++- .../reef/wake/time/runtime/RuntimeClock.java | 292 ++++++++++++------- .../apache/reef/wake/time/runtime/Timer.java | 44 +++ .../wake/time/runtime/event/ClientAlarm.java | 1 + .../apache/reef/wake/test/time/ClockTest.java | 140 +++++---- .../reef/webserver/ReefEventStateManager.java | 4 +- .../reef/webserver/TestHttpConfiguration.java | 4 +- 13 files changed, 478 insertions(+), 222 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/8e140c72/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java index c0d1ef5..b0c8d41 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/generic/JobDriver.java @@ -618,7 +618,7 @@ public final class JobDriver { LOG.log(Level.INFO, "Java DriverRestartCompleted event received at time [{0}]. ", driverRestartCompleted.getCompletedTime()); try (final LoggingScope ls = loggingScopeFactory.driverRestartCompleted( - driverRestartCompleted.getCompletedTime().getTimeStamp())) { + driverRestartCompleted.getCompletedTime().getTimestamp())) { if (JobDriver.this.handlerManager.getDriverRestartCompletedHandler() != 0) { LOG.log(Level.INFO, "CLR driver restart handler implemented, now handle it in CLR."); @@ -639,7 +639,7 @@ public final class JobDriver { @Override public void onNext(final StopTime time) { LOG.log(Level.INFO, " StopTime: {0}", new Object[]{time}); - try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimeStamp())) { + try (final LoggingScope ls = loggingScopeFactory.driverStop(time.getTimestamp())) { for (final ActiveContext context : contexts.values()) { context.close(); } http://git-wip-us.apache.org/repos/asf/reef/blob/8e140c72/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java old mode 100755 new mode 100644 index dfe693f..3ad5fe4 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/REEFLauncher.java @@ -76,6 +76,7 @@ public final class REEFLauncher { LoggingSetup.setupCommonsLogging(); } + /** Config parameter to turn on network IO profiling in Wake. */ private final boolean isWakeProfilingEnabled; /** REEF version - we need it simply to write it to the log. */ http://git-wip-us.apache.org/repos/asf/reef/blob/8e140c72/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java index 3fd6ffc..416a4f6 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/watcher/util/WatcherAvroUtil.java @@ -151,26 +151,26 @@ public final class WatcherAvroUtil { public static AvroRuntimeStart toAvroRuntimeStart(final RuntimeStart runtimeStart) { return AvroRuntimeStart.newBuilder() - .setTimestamp(runtimeStart.getTimeStamp()) + .setTimestamp(runtimeStart.getTimestamp()) .build(); } public static AvroStartTime toAvroStartTime(final StartTime startTime) { return AvroStartTime.newBuilder() - .setTimestamp(startTime.getTimeStamp()) + .setTimestamp(startTime.getTimestamp()) .build(); } public static AvroStopTime toAvroStopTime(final StopTime stopTime) { return AvroStopTime.newBuilder() - .setTimestamp(stopTime.getTimeStamp()) + .setTimestamp(stopTime.getTimestamp()) .build(); } public static AvroRuntimeStop toAvroRuntimeStop(final RuntimeStop runtimeStop) { return AvroRuntimeStop.newBuilder() .setException(convertThrowableToString(runtimeStop.getException())) - .setTimestamp(runtimeStop.getTimeStamp()) + .setTimestamp(runtimeStop.getTimestamp()) .build(); } http://git-wip-us.apache.org/repos/asf/reef/blob/8e140c72/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java index 44e90b5..0150962 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java @@ -18,53 +18,63 @@ */ package org.apache.reef.wake.time; +import java.util.Date; + /** - * Time object. + * An abstract object that has a timestamp. + * That allows us to compare and order objects by the time. */ public abstract class Time implements Comparable<Time> { private final long timestamp; + /** + * Initialize the internal timestamp. Timestamp remains constant + * for the entire lifecycle of the object. + * @param timestamp timestamp in milliseconds since the beginning + * of the epoch (01/01/1970). + */ public Time(final long timestamp) { this.timestamp = timestamp; } + /** + * Get timestamp in milliseconds since the beginning of the epoch (01/01/1970). + * @return Object's timestamp in milliseconds since the start of the epoch. + */ + public final long getTimestamp() { + return this.timestamp; + } + + /** + * Get timestamp in milliseconds since the beginning of the epoch (01/01/1970). + * @return Object's timestamp in milliseconds since the start of the epoch. + * @deprecated [REEF-1532] Prefer using getTimestamp() instead. + * Remove after release 0.16. + */ public final long getTimeStamp() { return this.timestamp; } @Override - public final String toString() { - return this.getClass().getName() + "[" + this.timestamp + "]"; + public String toString() { + return this.getClass().getName() + + ":[" + this.timestamp + '|' + new Date(this.timestamp) + ']'; } @Override - public final int compareTo(final Time o) { - if (this.timestamp < o.timestamp) { - return -1; - } - if (this.timestamp > o.timestamp) { - return 1; - } - if (this.hashCode() < o.hashCode()) { - return -1; - } - if (this.hashCode() > o.hashCode()) { - return 1; - } - return 0; + public int compareTo(final Time other) { + final int cmp = Long.compare(this.timestamp, other.timestamp); + return cmp != 0 ? cmp : Integer.compare(this.hashCode(), other.hashCode()); } @Override - public final boolean equals(final Object o) { - if (o instanceof Time) { - return compareTo((Time) o) == 0; - } - return false; + public boolean equals(final Object other) { + return other instanceof Time && compareTo((Time) other) == 0; } @Override - public final int hashCode() { + public int hashCode() { return super.hashCode(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/8e140c72/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/event/Alarm.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/event/Alarm.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/event/Alarm.java index fb278a4..ac1ed6d 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/event/Alarm.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/event/Alarm.java @@ -22,9 +22,10 @@ import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.time.Time; /** - * Represents a timer event. + * An alarm is a timer event to be invoked at a given time. + * Contains a (future) timestamp and the event handler to invoke. */ -public abstract class Alarm extends Time { +public abstract class Alarm extends Time implements Runnable { private final EventHandler<Alarm> handler; @@ -33,8 +34,11 @@ public abstract class Alarm extends Time { this.handler = handler; } - public final void handle() { + /** + * Invoke the event handler and pass a reference to self as a parameter. + */ + @Override + public final void run() { this.handler.onNext(this); } - } http://git-wip-us.apache.org/repos/asf/reef/blob/8e140c72/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java index 72d558e..1ae2ed8 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java @@ -18,36 +18,101 @@ */ package org.apache.reef.wake.time.runtime; +import org.apache.reef.wake.time.Time; + import javax.inject.Inject; +import java.util.concurrent.atomic.AtomicLong; /** - * Logical timer. + * Logical timer that is only bound to the timestamps of the events tracked against it. + * In such setting, all events occur immediately, i.e. isReady() always return true, + * and the duration of the delay to the next event is always 0. + * Current time for this timer is always the timestamp of the tracked event that is + * the most distant in the future. */ public final class LogicalTimer implements Timer { - private long current = 0; + /** + * Current time in milliseconds since the beginning of the epoch (01/01/1970), + * according to the timer. For this implementation, always keep the largest seen + * timestamp (i.e. track the event that is the most distant into the future). + */ + private final AtomicLong current = new AtomicLong(0); + /** + * Instances of the timer should only be created automatically by Tang. + */ @Inject - LogicalTimer() { + private LogicalTimer() { } + /** + * Get current time in milliseconds since the beginning of the epoch (01/01/1970). + * This timer implementation always returns the timestamp of the most distant + * future event ever checked against this timer in getDuration() or isReady() methods. + * Return 0 if there were no calls yet to getDuration() or isReady(). + * @return Timestamp of the latest event (in milliseconds since the start of the epoch). + */ @Override public long getCurrent() { - return this.current; + return this.current.get(); } + /** + * Get the number of milliseconds between current time as tracked by the Timer implementation + * and a given event. This implementation always returns 0 and updates current timer's time + * to the timestamp of the most distant future event. + * @param time Timestamp in milliseconds. + * @return Always returns 0. + * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp. + * Remove after release 0.16. + */ @Override public long getDuration(final long time) { - isReady(time); + this.isReady(time); return 0; } + /** + * Get the number of milliseconds between current time as tracked by the Timer implementation + * and a given event. This implementation always returns 0 and updates current timer's time + * to the timestamp of the most distant future event. + * @param time Timestamp object that wraps time in milliseconds. + * @return Always returns 0. + */ + @Override + public long getDuration(final Time time) { + return this.getDuration(time.getTimestamp()); + } + + /** + * Check if the event with a given timestamp has occurred, according to the timer. + * This implementation always returns true and updates current timer's time to the timestamp + * of the most distant future event. + * @param time Timestamp in milliseconds. + * @return Always returns true. + * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp. + * Remove after release 0.16. + */ @Override public boolean isReady(final long time) { - if (this.current < time) { - this.current = time; + while (true) { + final long thisTs = this.current.get(); + if (thisTs >= time || this.current.compareAndSet(thisTs, time)) { + return true; + } } - return true; } + /** + * Check if the event with a given timestamp has occurred, according to the timer. + * This implementation always returns true and updates current timer's time to the timestamp + * of the most distant future event. + * @param time Timestamp object that wraps time in milliseconds. + * @return Always returns true. + */ + @Override + public boolean isReady(final Time time) { + return this.isReady(time.getTimestamp()); + } } http://git-wip-us.apache.org/repos/asf/reef/blob/8e140c72/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RealTimer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RealTimer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RealTimer.java index 09ab817..402644a 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RealTimer.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RealTimer.java @@ -18,29 +18,78 @@ */ package org.apache.reef.wake.time.runtime; +import org.apache.reef.wake.time.Time; + import javax.inject.Inject; /** - * A system-time based Timer. + * Implementation of the Timer that uses the system clock. */ public final class RealTimer implements Timer { + /** + * Instances of the timer should only be created automatically by Tang. + */ @Inject - public RealTimer() { + private RealTimer() { } + /** + * Get current time in milliseconds since the beginning of the epoch (01/01/1970). + * @return Current system time in milliseconds since the start of the epoch. + */ @Override public long getCurrent() { return System.currentTimeMillis(); } + /** + * Get the number of milliseconds from current system time to the given event. + * Can return a negative number if the event is already in the past. + * @param time Timestamp in milliseconds. + * @return Difference in milliseconds between the given timestamp and current system time. + * The result is a negative number if the timestamp is in the past. + * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp. + * Remove after release 0.16. + */ @Override public long getDuration(final long time) { return time - getCurrent(); } + /** + * Get the number of milliseconds from current system time to the given event. + * Can return a negative number if the event is already in the past. + * @param time Timestamp object that wraps time in milliseconds. + * @return Difference in milliseconds between the given timestamp and current system time. + * The result is a negative number if the timestamp is in the past. + */ + @Override + public long getDuration(final Time time) { + return time.getTimestamp() - getCurrent(); + } + + /** + * Check if the event with a given timestamp has occurred. Return true if the timestamp + * equals or less than the current system time, and false if it is still in the future. + * @param time Timestamp in milliseconds. + * @return False if the given timestamp is still in the future. + * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp. + * Remove after release 0.16. + */ @Override public boolean isReady(final long time) { return getDuration(time) <= 0; } + + /** + * Check if the event with a given timestamp has occurred. Return true if the timestamp + * equals or less than the current system time, and false if it is still in the future. + * @param time Timestamp object that wraps time in milliseconds. + * @return False if the given timestamp is still in the future. + */ + @Override + public boolean isReady(final Time time) { + return getDuration(time) <= 0; + } } http://git-wip-us.apache.org/repos/asf/reef/blob/8e140c72/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java index a0ab566..43a25ac 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java @@ -44,13 +44,23 @@ import java.util.logging.Logger; */ public final class RuntimeClock implements Clock { - private static final Logger LOG = Logger.getLogger(Clock.class.toString()); + private static final Logger LOG = Logger.getLogger(RuntimeClock.class.getName()); + private static final String CLASS_NAME = RuntimeClock.class.getCanonicalName(); + /** + * Injectable source of current time information. + * Usually an instance of RealTimer that wraps the system clock. + */ private final Timer timer; - private final TreeSet<Time> schedule; + /** + * An ordered set of timed objects, in ascending order of their timestamps. + * It also serves as the main synchronization monitor for the class. + */ + private final TreeSet<Time> schedule = new TreeSet<>(); - private final PubSubEventHandler<Time> handlers; + /** Event handlers - populated with the injectable parameters provided to the RuntimeClock constructor. */ + private final PubSubEventHandler<Time> handlers = new PubSubEventHandler<>(); private final InjectionFuture<Set<EventHandler<StartTime>>> startHandler; private final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler; @@ -58,123 +68,171 @@ public final class RuntimeClock implements Clock { private final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler; private final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler; - private Throwable stoppedOnException; - private boolean closed = false; + /** + * Timestamp of the last client alarm in the schedule. + * We use it to schedule a graceful shutdown event immediately after all client alarms. + */ + private long lastClientAlarm = 0; + + /** + * Number of client alarms in the schedule. + * We need it to determine whether event loop is idle (i.e. has no client alarms scheduled) + */ + private int numClientAlarms = 0; + + /** Set to true when the clock is closed. */ + private boolean isClosed = false; + + /** Exception that caused the clock to stop. */ + private Throwable exceptionCausedStop = null; @Inject - RuntimeClock(final Timer timer, - @Parameter(Clock.StartHandler.class) final InjectionFuture<Set<EventHandler<StartTime>>> startHandler, - @Parameter(StopHandler.class) final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler, - @Parameter(Clock.RuntimeStartHandler.class) - final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler, - @Parameter(Clock.RuntimeStopHandler.class) - final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler, - @Parameter(IdleHandler.class) final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler) { - this.timer = timer; - this.schedule = new TreeSet<>(); - this.handlers = new PubSubEventHandler<>(); + private RuntimeClock( + final Timer timer, + @Parameter(Clock.StartHandler.class) + final InjectionFuture<Set<EventHandler<StartTime>>> startHandler, + @Parameter(Clock.StopHandler.class) + final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler, + @Parameter(Clock.RuntimeStartHandler.class) + final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler, + @Parameter(Clock.RuntimeStopHandler.class) + final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler, + @Parameter(Clock.IdleHandler.class) + final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler) { + this.timer = timer; this.startHandler = startHandler; this.stopHandler = stopHandler; this.runtimeStartHandler = runtimeStartHandler; this.runtimeStopHandler = runtimeStopHandler; this.idleHandler = idleHandler; - this.stoppedOnException = null; - LOG.log(Level.FINE, "RuntimeClock instantiated."); } + /** + * Schedule a new Alarm event in `offset` milliseconds into the future, + * and supply an event handler to be called at that time. + * @param offset Number of milliseconds into the future relative to current time. + * @param handler Event handler to be invoked. + * @throws IllegalStateException if the clock is already closed. + */ @Override public void scheduleAlarm(final int offset, final EventHandler<Alarm> handler) { + + final Time alarm = new ClientAlarm(this.timer.getCurrent() + offset, handler); + + if (LOG.isLoggable(Level.FINEST)) { + LOG.log(Level.FINEST, "Schedule alarm: {0}", alarm); + } + synchronized (this.schedule) { - if (this.closed) { + + if (this.isClosed) { throw new IllegalStateException("Scheduling alarm on a closed clock"); } - this.schedule.add(new ClientAlarm(this.timer.getCurrent() + offset, handler)); - this.schedule.notifyAll(); - } - } + if (alarm.getTimestamp() > this.lastClientAlarm) { + this.lastClientAlarm = alarm.getTimestamp(); + } - public void registerEventHandler(final Class<? extends Time> clazz, final EventHandler<Time> handler) { - this.handlers.subscribe(clazz, handler); - } + assert this.numClientAlarms >= 0; + ++this.numClientAlarms; - public void scheduleRuntimeAlarm(final int offset, final EventHandler<Alarm> handler) { - synchronized (this.schedule) { - this.schedule.add(new RuntimeAlarm(this.timer.getCurrent() + offset, handler)); + this.schedule.add(alarm); this.schedule.notifyAll(); } } + /** + * Stop the clock. Remove all other events from the schedule and fire StopTimer + * event immediately. It is recommended to use close() method for graceful shutdown + * instead of stop(). + */ @Override public void stop() { this.stop(null); } + /** + * Stop the clock on exception. + * Remove all other events from the schedule and fire StopTimer event immediately. + * @param exception Exception that is the cause for the stop. Can be null. + */ @Override - public void stop(final Throwable stopOnException) { - LOG.entering(RuntimeClock.class.getCanonicalName(), "stop"); + public void stop(final Throwable exception) { + + LOG.entering(CLASS_NAME, "stop"); + synchronized (this.schedule) { + + if (this.isClosed) { + LOG.log(Level.FINEST, "Clock has already been closed"); + return; + } + + this.isClosed = true; + this.exceptionCausedStop = exception; + + final Time stopEvent = new StopTime(this.timer.getCurrent()); + LOG.log(Level.FINE, "Stop scheduled immediately: {0}", stopEvent); + + this.numClientAlarms = 0; + assert this.numClientAlarms >= 0; + this.schedule.clear(); - this.schedule.add(new StopTime(timer.getCurrent())); + this.schedule.add(stopEvent); this.schedule.notifyAll(); - this.closed = true; - if (this.stoppedOnException == null) { - this.stoppedOnException = stopOnException; - } } - LOG.exiting(RuntimeClock.class.getCanonicalName(), "stop"); + + LOG.exiting(CLASS_NAME, "stop"); } + /** + * Wait for all client alarms to finish executing and gracefully shutdown the clock. + */ @Override public void close() { - LOG.entering(RuntimeClock.class.getCanonicalName(), "close"); + + LOG.entering(CLASS_NAME, "close"); + synchronized (this.schedule) { - if (this.closed) { - LOG.log(Level.INFO, "Clock is already closed"); + + if (this.isClosed) { + LOG.log(Level.FINEST, "Clock has already been closed"); return; } - this.schedule.clear(); - this.schedule.add(new StopTime(findAcceptableStopTime())); + + this.isClosed = true; + + final Time stopEvent = new StopTime(Math.max(this.timer.getCurrent(), this.lastClientAlarm + 1)); + LOG.log(Level.FINE, "Graceful shutdown scheduled: {0}", stopEvent); + + this.schedule.add(stopEvent); this.schedule.notifyAll(); - this.closed = true; - LOG.log(Level.INFO, "Clock.close()"); } - LOG.exiting(RuntimeClock.class.getCanonicalName(), "close"); + + LOG.exiting(CLASS_NAME, "close"); } /** - * Finds an acceptable stop time, which is the - * a time beyond that of any client alarm. - * - * @return an acceptable stop time + * Check if there are no client alarms scheduled. + * @return True if there are no client alarms in the schedule, false otherwise. */ - private long findAcceptableStopTime() { - long time = timer.getCurrent(); - for (final Time t : this.schedule) { - if (t instanceof ClientAlarm) { - assert time <= t.getTimeStamp(); - time = t.getTimeStamp(); - } - } - return time + 1; - } - - @Override public boolean isIdle() { synchronized (this.schedule) { - for (final Time t : this.schedule) { - if (t instanceof ClientAlarm) { - return false; - } - } - return true; + assert this.numClientAlarms >= 0; + return this.numClientAlarms == 0; } } + /** + * Register event handlers for the given event class. + * @param eventClass Event type to handle. Must be derived from Time. + * @param handlers One or many event handlers that can process given event type. + * @param <T> Event type - must be derived from class Time. (i.e. contain a timestamp). + */ @SuppressWarnings("checkstyle:hiddenfield") private <T extends Time> void subscribe(final Class<T> eventClass, final Set<EventHandler<T>> handlers) { for (final EventHandler<T> handler : handlers) { @@ -184,25 +242,35 @@ public final class RuntimeClock implements Clock { /** * Logs the currently running threads. - * - * @param level the level used for the log entry + * @param level Log level used to write the entry. * @param prefix put before the comma-separated list of threads */ - private void logThreads(final Level level, final String prefix) { - final StringBuilder sb = new StringBuilder(prefix); - for (final Thread t : Thread.getAllStackTraces().keySet()) { - sb.append(t.getName()); - sb.append(", "); + private static void logThreads(final Level level, final String prefix) { + + if (LOG.isLoggable(level)) { + + final StringBuilder sb = new StringBuilder(prefix); + for (final Thread t : Thread.getAllStackTraces().keySet()) { + sb.append(t.getName()).append(", "); + } + + LOG.log(level, sb.toString()); } - LOG.log(level, sb.toString()); } + /** + * Main event loop. + * Set up the event handlers, and go into event loop that polls the schedule and process events in it. + */ @Override public void run() { - LOG.entering(RuntimeClock.class.getCanonicalName(), "run"); + + LOG.entering(CLASS_NAME, "run"); try { + LOG.log(Level.FINE, "Subscribe event handlers"); + subscribe(StartTime.class, this.startHandler.get()); subscribe(StopTime.class, this.stopHandler.get()); subscribe(RuntimeStart.class, this.runtimeStartHandler.get()); @@ -213,66 +281,82 @@ public final class RuntimeClock implements Clock { this.handlers.onNext(new RuntimeStart(this.timer.getCurrent())); LOG.log(Level.FINE, "Initiate start time"); - final StartTime start = new StartTime(this.timer.getCurrent()); - this.handlers.onNext(start); + this.handlers.onNext(new StartTime(this.timer.getCurrent())); while (true) { - LOG.log(Level.FINEST, "Entering clock main loop iteration."); + + LOG.log(Level.FINEST, "Enter clock main loop."); + try { + if (this.isIdle()) { // Handle an idle clock event, without locking this.schedule this.handlers.onNext(new IdleClock(timer.getCurrent())); } - Time time = null; + final Time event; synchronized (this.schedule) { + while (this.schedule.isEmpty()) { this.schedule.wait(); } assert this.schedule.first() != null; - // Wait until the first scheduled time is ready - for (long duration = this.timer.getDuration(this.schedule.first().getTimeStamp()); - duration > 0; - duration = this.timer.getDuration(this.schedule.first().getTimeStamp())) { - // note: while I'm waiting, another alarm could be scheduled with a shorter duration - // so the next time I go around the loop I need to revise my duration - this.schedule.wait(duration); + // Wait until the first scheduled time is ready. + // NOTE: while waiting, another alarm could be scheduled with a shorter duration + // so the next time I go around the loop I need to revise my duration. + while (true) { + final long waitDuration = this.timer.getDuration(this.schedule.first()); + if (waitDuration <= 0) { + break; + } + this.schedule.wait(waitDuration); } + // Remove the event from the schedule and process it: - time = this.schedule.pollFirst(); - assert time != null; + event = this.schedule.pollFirst(); } - if (time instanceof Alarm) { - final Alarm alarm = (Alarm) time; - alarm.handle(); + LOG.log(Level.FINER, "Process event: {0}", event); + assert event != null; + + if (event instanceof Alarm) { + + if (event instanceof ClientAlarm) { + --this.numClientAlarms; + assert this.numClientAlarms >= 0; + } + + ((Alarm) event).run(); + } else { - this.handlers.onNext(time); - if (time instanceof StopTime) { + + this.handlers.onNext(event); + + if (event instanceof StopTime) { break; // we're done. } } + } catch (final InterruptedException expected) { - // waiting interrupted - return to loop + LOG.log(Level.FINEST, "Wait interrupted; continue event loop."); } } - if (this.stoppedOnException == null) { - this.handlers.onNext(new RuntimeStop(this.timer.getCurrent())); - } else { - this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), this.stoppedOnException)); - } + + this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), this.exceptionCausedStop)); + } catch (final Exception e) { - e.printStackTrace(); + + LOG.log(Level.SEVERE, "Error in runtime clock", e); this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e)); + } finally { + logThreads(Level.FINE, "Threads running after exiting the clock main loop: "); LOG.log(Level.FINE, "Runtime clock exit"); } - LOG.exiting(RuntimeClock.class.getCanonicalName(), "run"); + LOG.exiting(CLASS_NAME, "run"); } - - } http://git-wip-us.apache.org/repos/asf/reef/blob/8e140c72/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/Timer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/Timer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/Timer.java index 9d97aae..1935938 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/Timer.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/Timer.java @@ -19,15 +19,59 @@ package org.apache.reef.wake.time.runtime; import org.apache.reef.tang.annotations.DefaultImplementation; +import org.apache.reef.wake.time.Time; /** * An interface for Timer. + * Default implementation uses actual system time. */ @DefaultImplementation(RealTimer.class) public interface Timer { + + /** + * Get current time in milliseconds since the beginning of the epoch (01/01/1970). + * Note that this time may not necessarily match the actual system time - e.g. in unit tests. + * @return Current system time in milliseconds since the start of the epoch. + */ long getCurrent(); + /** + * Get the number of milliseconds between current time as tracked by the Timer implementation + * and the given event. Can return a negative number if the event is already in the past. + * @param time Timestamp in milliseconds. + * @return Difference in milliseconds between the given timestamp and the time tracked by the timer. + * The result is a negative number if the timestamp is in the past (according to the timer's time). + * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp. + * Remove after release 0.16. + */ long getDuration(final long time); + /** + * Get the number of milliseconds between current time as tracked by the Timer implementation + * and the given event. Can return a negative number if the event is already in the past. + * @param time Timestamp object that wraps time in milliseconds. + * @return Difference in milliseconds between the given timestamp and the time tracked by the timer. + * The result is a negative number if the timestamp is in the past (according to the timer's time). + */ + long getDuration(final Time time); + + /** + * Check if the event with a given timestamp has occurred, according to the timer. + * Return true if the timestamp is equal or less than the timer's time, and false if + * it is still in the (timer's) future. + * @param time Timestamp in milliseconds. + * @return False if the given timestamp is still in the timer's time future. + * @deprecated [REEF-1532] Prefer passing Time object instead of the numeric timestamp. + * Remove after release 0.16. + */ boolean isReady(final long time); + + /** + * Check if the event with a given timestamp has occurred, according to the timer. + * Return true if the timestamp is equal or less than the timer's time, and false if + * it is still in the (timer's) future. + * @param time Timestamp object that wraps time in milliseconds. + * @return False if the given timestamp is still in the timer's time future. + */ + boolean isReady(final Time time); } http://git-wip-us.apache.org/repos/asf/reef/blob/8e140c72/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/ClientAlarm.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/ClientAlarm.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/ClientAlarm.java index 81c4802..3671cf3 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/ClientAlarm.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/event/ClientAlarm.java @@ -23,6 +23,7 @@ import org.apache.reef.wake.time.event.Alarm; /** * An event for client-created alarm. + * Contains a timestamp and the event handler to invoke at that time. */ public final class ClientAlarm extends Alarm { http://git-wip-us.apache.org/repos/asf/reef/blob/8e140c72/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java index c444205..afb6866 100644 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java +++ b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/time/ClockTest.java @@ -18,15 +18,16 @@ */ package org.apache.reef.wake.test.time; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.JavaConfigurationBuilder; +import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.LoggingUtils; import org.apache.reef.wake.impl.ThreadPoolStage; import org.apache.reef.wake.time.Time; import org.apache.reef.wake.time.event.Alarm; import org.apache.reef.wake.time.runtime.LogicalTimer; +import org.apache.reef.wake.time.runtime.RealTimer; import org.apache.reef.wake.time.runtime.RuntimeClock; import org.apache.reef.wake.time.runtime.Timer; import org.junit.Assert; @@ -45,57 +46,51 @@ import java.util.logging.Level; */ public class ClockTest { - private static RuntimeClock buildClock() throws Exception { - final JavaConfigurationBuilder builder = Tang.Factory.getTang() - .newConfigurationBuilder(); + private static final Tang TANG = Tang.Factory.getTang(); - final Injector injector = Tang.Factory.getTang() - .newInjector(builder.build()); + private static RuntimeClock buildClock( + final Class<? extends Timer> timerClass) throws InjectionException { - return injector.getInstance(RuntimeClock.class); - } - - private static RuntimeClock buildLogicalClock() throws Exception { - final JavaConfigurationBuilder builder = Tang.Factory.getTang() - .newConfigurationBuilder(); + final Configuration clockConfig = TANG.newConfigurationBuilder() + .bind(Timer.class, timerClass) + .build(); - builder.bind(Timer.class, LogicalTimer.class); - - final Injector injector = Tang.Factory.getTang() - .newInjector(builder.build()); - return injector.getInstance(RuntimeClock.class); + return TANG.newInjector(clockConfig).getInstance(RuntimeClock.class); } @Test public void testClock() throws Exception { - LoggingUtils.setLoggingLevel(Level.FINE); + + LoggingUtils.setLoggingLevel(Level.FINEST); final int minEvents = 40; final CountDownLatch eventCountLatch = new CountDownLatch(minEvents); - final RuntimeClock clock = buildClock(); - new Thread(clock).start(); - final RandomAlarmProducer alarmProducer = new RandomAlarmProducer(clock, eventCountLatch); + try (final RuntimeClock clock = buildClock(RealTimer.class)) { - try (ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(alarmProducer, 10)) { - stage.onNext(null); - Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS)); - } finally { - clock.close(); + new Thread(clock).start(); + + final RandomAlarmProducer alarmProducer = new RandomAlarmProducer(clock, eventCountLatch); + + try (ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(alarmProducer, 10)) { + stage.onNext(null); + Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS)); + } } } @Test public void testAlarmRegistrationRaceConditions() throws Exception { - LoggingUtils.setLoggingLevel(Level.FINE); - final RuntimeClock clock = buildClock(); - new Thread(clock).start(); + LoggingUtils.setLoggingLevel(Level.FINEST); + + try (final RuntimeClock clock = buildClock(RealTimer.class)) { - final EventRecorder earlierAlarmRecorder = new EventRecorder(); - final EventRecorder laterAlarmRecorder = new EventRecorder(); + new Thread(clock).start(); + + final EventRecorder earlierAlarmRecorder = new EventRecorder(); + final EventRecorder laterAlarmRecorder = new EventRecorder(); - try { // Schedule an Alarm that's far in the future clock.scheduleAlarm(5000, laterAlarmRecorder); Thread.sleep(1000); @@ -117,72 +112,77 @@ public class ClockTest { // The later Alarm should have fired, since 6000 > 5000 ms have passed: Assert.assertEquals(1, laterAlarmRecorder.getEventCount()); - } finally { - clock.close(); } } @Test public void testMultipleCloseCalls() throws Exception { - LoggingUtils.setLoggingLevel(Level.FINE); + + LoggingUtils.setLoggingLevel(Level.FINEST); final int numThreads = 3; final CountDownLatch eventCountLatch = new CountDownLatch(numThreads); - final RuntimeClock clock = buildClock(); - new Thread(clock).start(); - final ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(new EventHandler<Alarm>() { - @Override - public void onNext(final Alarm value) { - clock.close(); - eventCountLatch.countDown(); - } - }, numThreads); + try (final RuntimeClock clock = buildClock(RealTimer.class)) { - try { - for (int i = 0; i < numThreads; ++i) { - stage.onNext(null); + final EventHandler<Alarm> handler = new EventHandler<Alarm>() { + @Override + public void onNext(final Alarm value) { + clock.close(); + eventCountLatch.countDown(); + } + }; + + new Thread(clock).start(); + + try (final ThreadPoolStage<Alarm> stage = new ThreadPoolStage<>(handler, numThreads)) { + + for (int i = 0; i < numThreads; ++i) { + stage.onNext(null); + } + + Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS)); } - Assert.assertTrue(eventCountLatch.await(10, TimeUnit.SECONDS)); - } finally { - stage.close(); - clock.close(); } } @Test public void testSimultaneousAlarms() throws Exception { - LoggingUtils.setLoggingLevel(Level.FINE); + + LoggingUtils.setLoggingLevel(Level.FINEST); final int expectedEvent = 2; final CountDownLatch eventCountLatch = new CountDownLatch(expectedEvent); - final RuntimeClock clock = buildLogicalClock(); - new Thread(clock).start(); + try (final RuntimeClock clock = buildClock(LogicalTimer.class)) { + + new Thread(clock).start(); + + final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch); - final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch); - try { clock.scheduleAlarm(500, alarmRecorder); clock.scheduleAlarm(500, alarmRecorder); + eventCountLatch.await(10, TimeUnit.SECONDS); + Assert.assertEquals(expectedEvent, alarmRecorder.getEventCount()); - } finally { - clock.close(); } } @Test public void testAlarmOrder() throws Exception { - LoggingUtils.setLoggingLevel(Level.FINE); + + LoggingUtils.setLoggingLevel(Level.FINEST); final int numAlarms = 10; final CountDownLatch eventCountLatch = new CountDownLatch(numAlarms); - final RuntimeClock clock = buildLogicalClock(); - new Thread(clock).start(); + try (final RuntimeClock clock = buildClock(LogicalTimer.class)) { + + new Thread(clock).start(); + + final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch); - final EventRecorder alarmRecorder = new EventRecorder(eventCountLatch); - try { final long[] expected = new long[numAlarms]; for (int i = 0; i < numAlarms; ++i) { clock.scheduleAlarm(i * 100, alarmRecorder); @@ -191,15 +191,13 @@ public class ClockTest { eventCountLatch.await(10, TimeUnit.SECONDS); - final Long[] actualLong = new Long[numAlarms]; - alarmRecorder.getTimestamps().toArray(actualLong); + int i = 0; final long[] actual = new long[numAlarms]; - for (int i = 0; i < numAlarms; ++i) { - actual[i] = actualLong[i]; + for (final long ts : alarmRecorder.getTimestamps()) { + actual[i++] = ts; } + Assert.assertArrayEquals(expected, actual); - } finally { - clock.close(); } } @@ -234,7 +232,7 @@ public class ClockTest { @Override public void onNext(final Alarm event) { - timestamps.add(event.getTimeStamp()); + timestamps.add(event.getTimestamp()); events.add(event); if (eventCountLatch != null) { eventCountLatch.countDown(); http://git-wip-us.apache.org/repos/asf/reef/blob/8e140c72/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java index f60c19c..177dabb 100644 --- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java +++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java @@ -101,7 +101,7 @@ public final class ReefEventStateManager { */ public String getStartTime() { if (startTime != null) { - return convertTime(startTime.getTimeStamp()); + return convertTime(startTime.getTimestamp()); } return null; } @@ -113,7 +113,7 @@ public final class ReefEventStateManager { */ public String getStopTime() { if (stopTime != null) { - return convertTime(stopTime.getTimeStamp()); + return convertTime(stopTime.getTimestamp()); } return null; } http://git-wip-us.apache.org/repos/asf/reef/blob/8e140c72/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestHttpConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestHttpConfiguration.java b/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestHttpConfiguration.java index 0c78e8c..28ef9b8 100644 --- a/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestHttpConfiguration.java +++ b/lang/java/reef-webserver/src/test/java/org/apache/reef/webserver/TestHttpConfiguration.java @@ -129,7 +129,7 @@ public class TestHttpConfiguration { final ReefEventStateManager reefEventStateManager = this.injector.getInstance(ReefEventStateManager.class); - Assert.assertEquals(reefEventStateManager.getStopTime(), convertTime(st.getTimeStamp())); + Assert.assertEquals(reefEventStateManager.getStopTime(), convertTime(st.getTimestamp())); } @Test @@ -144,7 +144,7 @@ public class TestHttpConfiguration { final ReefEventStateManager reefEventStateManager = this.injector.getInstance(ReefEventStateManager.class); - Assert.assertEquals(reefEventStateManager.getStartTime(), convertTime(st.getTimeStamp())); + Assert.assertEquals(reefEventStateManager.getStartTime(), convertTime(st.getTimestamp())); } private String convertTime(final long time) {
