Repository: reef Updated Branches: refs/heads/master 3e9afba8b -> d426e2b8b
[REEF-1726] Close message dispatcher on the evaluator manager shutdown JIRA: [REEF-1726](https://issues.apache.org/jira/browse/REEF-1726) Pull Request: This closes #1241 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d426e2b8 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d426e2b8 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d426e2b8 Branch: refs/heads/master Commit: d426e2b8bcb42ba2bd22a8ca112041ab9fd2a7b1 Parents: 3e9afba Author: Sergiy Matusevych <[email protected]> Authored: Wed Jan 25 13:58:28 2017 -0800 Committer: Markus Weimer <[email protected]> Committed: Wed Apr 19 09:07:32 2017 -0700 ---------------------------------------------------------------------- .../driver/evaluator/EvaluatorManager.java | 5 ++++- .../evaluator/EvaluatorMessageDispatcher.java | 6 ++--- .../runtime/common/utils/DispatchingEStage.java | 23 +++++++++++++++----- .../org/apache/reef/wake/AbstractEStage.java | 16 ++++++++++++++ .../apache/reef/wake/impl/ThreadPoolStage.java | 7 ------ 5 files changed, 40 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/d426e2b8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java index 61564b1..c555adc 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java @@ -263,15 +263,18 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { try { // We need to wait awhile before returning the container to the RM // in order to give the EvaluatorRuntime (and Launcher) time to cleanly exit. - this.clock.scheduleAlarm(100, new EventHandler<Alarm>() { + this.clock.scheduleAlarm(200, new EventHandler<Alarm>() { @Override public void onNext(final Alarm alarm) { + LOG.log(Level.FINER, "Close EvaluatorManager {0} - release to RM", evaluatorId); resourceReleaseHandler.onNext(releaseEvent); + shutdown(); } }); } catch (final IllegalStateException e) { LOG.log(Level.WARNING, "Force resource release because the client closed the clock.", e); this.resourceReleaseHandler.onNext(releaseEvent); + this.shutdown(); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/d426e2b8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java index 73854b2..e113e76 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java @@ -290,10 +290,10 @@ public final class EvaluatorMessageDispatcher implements AutoCloseable { LOG.log(Level.FINER, "Closing message dispatcher for {0}", this.evaluatorIdentifier); // This effectively closes all dispatchers as they share the same stage. this.serviceDispatcher.close(); - if (!this.serviceDispatcher.isThreadPoolClosed()) { + if (!this.serviceDispatcher.isClosed()) { LOG.log(Level.SEVERE, - "Closing message dispatcher for {0}: ThreadPool for service dispatcher failed to close", - this.evaluatorIdentifier); + "Closing message dispatcher for {0}: ThreadPool for service dispatcher failed to close", + this.evaluatorIdentifier); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/d426e2b8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java index f5b1c1d..91ff7d6 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java @@ -28,6 +28,8 @@ import org.apache.reef.wake.impl.ThreadPoolStage; import java.util.Collections; import java.util.Map; import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Delayed event router that dispatches messages to the proper event handler by type. @@ -37,6 +39,8 @@ import java.util.Set; @DriverSide public final class DispatchingEStage implements AutoCloseable { + private static final Logger LOG = Logger.getLogger(DispatchingEStage.class.getName()); + /** * A map of event handlers, populated in the register() method. */ @@ -98,7 +102,7 @@ public final class DispatchingEStage implements AutoCloseable { /** * Dispatch a new message by type. - * + * If the stage is already closed, log a warning and ignore the message. * @param type Type of event handler - must match the register() call. * @param message A message to process. Must be a subclass of T. * @param <T> Message type that event handler supports. @@ -106,8 +110,13 @@ public final class DispatchingEStage implements AutoCloseable { */ @SuppressWarnings("unchecked") public <T, U extends T> void onNext(final Class<T> type, final U message) { - final EventHandler<T> handler = (EventHandler<T>) this.handlers.get(type); - this.stage.onNext(new DelayedOnNext(handler, message)); + if (this.isClosed()) { + LOG.log(Level.WARNING, "Dispatcher {0} already closed: ignoring message {1}: {2}", + new Object[] {this.stage, type.getCanonicalName(), message}); + } else { + final EventHandler<T> handler = (EventHandler<T>) this.handlers.get(type); + this.stage.onNext(new DelayedOnNext(handler, message)); + } } /** @@ -118,7 +127,8 @@ public final class DispatchingEStage implements AutoCloseable { } /** - * Close the internal thread pool. + * Close the stage adn stop accepting new messages. + * Closes the internal thread pool. */ @Override public void close() { @@ -126,9 +136,10 @@ public final class DispatchingEStage implements AutoCloseable { } /** - * Returns true if the internal thread pool is closed. + * Check if the stage can still accept messages. + * @return true if the stage can no longer accept messages, false otherwise. */ - public boolean isThreadPoolClosed() { + public boolean isClosed() { return this.stage.isClosed(); } http://git-wip-us.apache.org/repos/asf/reef/blob/d426e2b8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java index 2861204..5395054 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java @@ -88,4 +88,20 @@ public abstract class AbstractEStage<T> implements EStage<T> { outMeter.mark(1); } + /** + * Check if the stage can still accept messages. + * @return true if the stage is closed, false otherwise. + */ + public boolean isClosed() { + return closed.get(); + } + + /** + * Get human readable representation of the class (used for logging). + * @return A string that contains stage name. + */ + @Override + public String toString() { + return String.format("Stage:%s:%s", this.getClass().getCanonicalName(), name); + } } http://git-wip-us.apache.org/repos/asf/reef/blob/d426e2b8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java index 005db44..4c57fa7 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java @@ -236,13 +236,6 @@ public final class ThreadPoolStage<T> extends AbstractEStage<T> { } /** - * Returns true if resources are closed. - */ - public boolean isClosed() { - return closed.get() && executor.isTerminated(); - } - - /** * Gets the queue length of this stage. * * @return the queue length
