Repository: reef
Updated Branches:
  refs/heads/master c5322db90 -> 3e9afba8b


[REEF-1780] Improve logging when closing message dispatcher on the evaluator 
manager shutdown

JIRA: [REEF-1780](https://issues.apache.org/jira/browse/REEF-1780)

Closes #1270


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/3e9afba8
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/3e9afba8
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/3e9afba8

Branch: refs/heads/master
Commit: 3e9afba8b9074f0b7b68215c0ecd9777bae7cd28
Parents: c5322db
Author: Shouheng Yi <[email protected]>
Authored: Mon Mar 20 12:22:24 2017 -0700
Committer: Sergiy Matusevych <[email protected]>
Committed: Mon Apr 17 20:21:31 2017 -0700

----------------------------------------------------------------------
 .../evaluator/EvaluatorMessageDispatcher.java       |  5 +++++
 .../runtime/common/utils/DispatchingEStage.java     |  7 +++++++
 .../org/apache/reef/wake/impl/ThreadPoolStage.java  | 16 ++++++++++++----
 3 files changed, 24 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/3e9afba8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
index ce879da..73854b2 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -290,5 +290,10 @@ public final class EvaluatorMessageDispatcher implements 
AutoCloseable {
     LOG.log(Level.FINER, "Closing message dispatcher for {0}", 
this.evaluatorIdentifier);
     // This effectively closes all dispatchers as they share the same stage.
     this.serviceDispatcher.close();
+    if (!this.serviceDispatcher.isThreadPoolClosed()) {
+      LOG.log(Level.SEVERE,
+              "Closing message dispatcher for {0}: ThreadPool for service 
dispatcher failed to close",
+              this.evaluatorIdentifier);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/3e9afba8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
index 3a65df9..f5b1c1d 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
@@ -126,6 +126,13 @@ public final class DispatchingEStage implements 
AutoCloseable {
   }
 
   /**
+   * Returns true if the internal thread pool is closed.
+   */
+  public boolean isThreadPoolClosed() {
+    return this.stage.isClosed();
+  }
+
+  /**
    * Delayed EventHandler.onNext() call.
    * Contains a message object and EventHandler to process it.
    */

http://git-wip-us.apache.org/repos/asf/reef/blob/3e9afba8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
index 7b6107f..005db44 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
@@ -27,10 +27,7 @@ import org.apache.reef.wake.exception.WakeRuntimeException;
 
 import javax.inject.Inject;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -230,11 +227,22 @@ public final class ThreadPoolStage<T> extends 
AbstractEStage<T> {
             new Object[] {this.name, SHUTDOWN_TIMEOUT, 
droppedRunnables.size()});
       }
 
+      if (!executor.isTerminated()) {
+        LOG.log(Level.SEVERE, "Closing ThreadPoolStage {0}: Executor failed to 
terminate.", this.name);
+      }
+
       LOG.log(Level.FINEST, "Closing ThreadPoolStage {0}: end", this.name);
     }
   }
 
   /**
+   * Returns true if resources are closed.
+   */
+  public boolean isClosed() {
+    return closed.get() && executor.isTerminated();
+  }
+
+  /**
    * Gets the queue length of this stage.
    *
    * @return the queue length

Reply via email to