Repository: reef Updated Branches: refs/heads/master c5322db90 -> 3e9afba8b
[REEF-1780] Improve logging when closing message dispatcher on the evaluator manager shutdown JIRA: [REEF-1780](https://issues.apache.org/jira/browse/REEF-1780) Closes #1270 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/3e9afba8 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/3e9afba8 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/3e9afba8 Branch: refs/heads/master Commit: 3e9afba8b9074f0b7b68215c0ecd9777bae7cd28 Parents: c5322db Author: Shouheng Yi <[email protected]> Authored: Mon Mar 20 12:22:24 2017 -0700 Committer: Sergiy Matusevych <[email protected]> Committed: Mon Apr 17 20:21:31 2017 -0700 ---------------------------------------------------------------------- .../evaluator/EvaluatorMessageDispatcher.java | 5 +++++ .../runtime/common/utils/DispatchingEStage.java | 7 +++++++ .../org/apache/reef/wake/impl/ThreadPoolStage.java | 16 ++++++++++++---- 3 files changed, 24 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/3e9afba8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java index ce879da..73854b2 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java @@ -290,5 +290,10 @@ public final class EvaluatorMessageDispatcher implements AutoCloseable { LOG.log(Level.FINER, "Closing message dispatcher for {0}", this.evaluatorIdentifier); // This effectively closes all dispatchers as they share the same stage. this.serviceDispatcher.close(); + if (!this.serviceDispatcher.isThreadPoolClosed()) { + LOG.log(Level.SEVERE, + "Closing message dispatcher for {0}: ThreadPool for service dispatcher failed to close", + this.evaluatorIdentifier); + } } } http://git-wip-us.apache.org/repos/asf/reef/blob/3e9afba8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java index 3a65df9..f5b1c1d 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java @@ -126,6 +126,13 @@ public final class DispatchingEStage implements AutoCloseable { } /** + * Returns true if the internal thread pool is closed. + */ + public boolean isThreadPoolClosed() { + return this.stage.isClosed(); + } + + /** * Delayed EventHandler.onNext() call. * Contains a message object and EventHandler to process it. */ http://git-wip-us.apache.org/repos/asf/reef/blob/3e9afba8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java index 7b6107f..005db44 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java @@ -27,10 +27,7 @@ import org.apache.reef.wake.exception.WakeRuntimeException; import javax.inject.Inject; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -230,11 +227,22 @@ public final class ThreadPoolStage<T> extends AbstractEStage<T> { new Object[] {this.name, SHUTDOWN_TIMEOUT, droppedRunnables.size()}); } + if (!executor.isTerminated()) { + LOG.log(Level.SEVERE, "Closing ThreadPoolStage {0}: Executor failed to terminate.", this.name); + } + LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: end", this.name); } } /** + * Returns true if resources are closed. + */ + public boolean isClosed() { + return closed.get() && executor.isTerminated(); + } + + /** * Gets the queue length of this stage. * * @return the queue length
