Repository: reef
Updated Branches:
  refs/heads/master 3e9afba8b -> d426e2b8b


[REEF-1726] Close message dispatcher on the evaluator manager shutdown

JIRA:
  [REEF-1726](https://issues.apache.org/jira/browse/REEF-1726)

Pull Request:
  This closes #1241


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d426e2b8
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d426e2b8
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d426e2b8

Branch: refs/heads/master
Commit: d426e2b8bcb42ba2bd22a8ca112041ab9fd2a7b1
Parents: 3e9afba
Author: Sergiy Matusevych <[email protected]>
Authored: Wed Jan 25 13:58:28 2017 -0800
Committer: Markus Weimer <[email protected]>
Committed: Wed Apr 19 09:07:32 2017 -0700

----------------------------------------------------------------------
 .../driver/evaluator/EvaluatorManager.java      |  5 ++++-
 .../evaluator/EvaluatorMessageDispatcher.java   |  6 ++---
 .../runtime/common/utils/DispatchingEStage.java | 23 +++++++++++++++-----
 .../org/apache/reef/wake/AbstractEStage.java    | 16 ++++++++++++++
 .../apache/reef/wake/impl/ThreadPoolStage.java  |  7 ------
 5 files changed, 40 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/d426e2b8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 61564b1..c555adc 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -263,15 +263,18 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
         try {
           // We need to wait awhile before returning the container to the RM
           // in order to give the EvaluatorRuntime (and Launcher) time to 
cleanly exit.
-          this.clock.scheduleAlarm(100, new EventHandler<Alarm>() {
+          this.clock.scheduleAlarm(200, new EventHandler<Alarm>() {
             @Override
             public void onNext(final Alarm alarm) {
+              LOG.log(Level.FINER, "Close EvaluatorManager {0} - release to 
RM", evaluatorId);
               resourceReleaseHandler.onNext(releaseEvent);
+              shutdown();
             }
           });
         } catch (final IllegalStateException e) {
           LOG.log(Level.WARNING, "Force resource release because the client 
closed the clock.", e);
           this.resourceReleaseHandler.onNext(releaseEvent);
+          this.shutdown();
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/d426e2b8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
index 73854b2..e113e76 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorMessageDispatcher.java
@@ -290,10 +290,10 @@ public final class EvaluatorMessageDispatcher implements 
AutoCloseable {
     LOG.log(Level.FINER, "Closing message dispatcher for {0}", 
this.evaluatorIdentifier);
     // This effectively closes all dispatchers as they share the same stage.
     this.serviceDispatcher.close();
-    if (!this.serviceDispatcher.isThreadPoolClosed()) {
+    if (!this.serviceDispatcher.isClosed()) {
       LOG.log(Level.SEVERE,
-              "Closing message dispatcher for {0}: ThreadPool for service 
dispatcher failed to close",
-              this.evaluatorIdentifier);
+          "Closing message dispatcher for {0}: ThreadPool for service 
dispatcher failed to close",
+          this.evaluatorIdentifier);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d426e2b8/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
index f5b1c1d..91ff7d6 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/DispatchingEStage.java
@@ -28,6 +28,8 @@ import org.apache.reef.wake.impl.ThreadPoolStage;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 /**
  * Delayed event router that dispatches messages to the proper event handler 
by type.
@@ -37,6 +39,8 @@ import java.util.Set;
 @DriverSide
 public final class DispatchingEStage implements AutoCloseable {
 
+  private static final Logger LOG = 
Logger.getLogger(DispatchingEStage.class.getName());
+
   /**
    * A map of event handlers, populated in the register() method.
    */
@@ -98,7 +102,7 @@ public final class DispatchingEStage implements 
AutoCloseable {
 
   /**
    * Dispatch a new message by type.
-   *
+   * If the stage is already closed, log a warning and ignore the message.
    * @param type    Type of event handler - must match the register() call.
    * @param message A message to process. Must be a subclass of T.
    * @param <T>     Message type that event handler supports.
@@ -106,8 +110,13 @@ public final class DispatchingEStage implements 
AutoCloseable {
    */
   @SuppressWarnings("unchecked")
   public <T, U extends T> void onNext(final Class<T> type, final U message) {
-    final EventHandler<T> handler = (EventHandler<T>) this.handlers.get(type);
-    this.stage.onNext(new DelayedOnNext(handler, message));
+    if (this.isClosed()) {
+      LOG.log(Level.WARNING, "Dispatcher {0} already closed: ignoring message 
{1}: {2}",
+          new Object[] {this.stage, type.getCanonicalName(), message});
+    } else {
+      final EventHandler<T> handler = (EventHandler<T>) 
this.handlers.get(type);
+      this.stage.onNext(new DelayedOnNext(handler, message));
+    }
   }
 
   /**
@@ -118,7 +127,8 @@ public final class DispatchingEStage implements 
AutoCloseable {
   }
 
   /**
-   * Close the internal thread pool.
+   * Close the stage adn stop accepting new messages.
+   * Closes the internal thread pool.
    */
   @Override
   public void close() {
@@ -126,9 +136,10 @@ public final class DispatchingEStage implements 
AutoCloseable {
   }
 
   /**
-   * Returns true if the internal thread pool is closed.
+   * Check if the stage can still accept messages.
+   * @return true if the stage can no longer accept messages, false otherwise.
    */
-  public boolean isThreadPoolClosed() {
+  public boolean isClosed() {
     return this.stage.isClosed();
   }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/d426e2b8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java
index 2861204..5395054 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/AbstractEStage.java
@@ -88,4 +88,20 @@ public abstract class AbstractEStage<T> implements EStage<T> 
{
     outMeter.mark(1);
   }
 
+  /**
+   * Check if the stage can still accept messages.
+   * @return true if the stage is closed, false otherwise.
+   */
+  public boolean isClosed() {
+    return closed.get();
+  }
+
+  /**
+   * Get human readable representation of the class (used for logging).
+   * @return A string that contains stage name.
+   */
+  @Override
+  public String toString() {
+    return String.format("Stage:%s:%s", this.getClass().getCanonicalName(), 
name);
+  }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d426e2b8/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
index 005db44..4c57fa7 100644
--- 
a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
+++ 
b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/ThreadPoolStage.java
@@ -236,13 +236,6 @@ public final class ThreadPoolStage<T> extends 
AbstractEStage<T> {
   }
 
   /**
-   * Returns true if resources are closed.
-   */
-  public boolean isClosed() {
-    return closed.get() && executor.isTerminated();
-  }
-
-  /**
    * Gets the queue length of this stage.
    *
    * @return the queue length

Reply via email to