Repository: incubator-reef
Updated Branches:
  refs/heads/master 7dae6ff6d -> 0fa76c5c8


[REEF-151] Fixed a synchronization issue in EvaluatorManager

  In EvaluatorManager, `onEvaluatorHeartbeatMessage` was `synchronized`,
  while `onEvaluatorException` was `synchronized(evaluatorDescriptor)`.
  This can create the race condition described in the JIRA. This PR is
  to change the synchronization in `onEvaluatorHeartbeatMessage` to
  match the common pattern in the rest of the file.

  Also, this makes `EvaluatorControlHandler.send()` safe to use when the
  Evaluator is already down. It used to crash in those situation. This
  change writes a WARNING to the log instead.

JIRA:
  [REEF-151](https://issues.apache.org/jira/browse/REEF-151)

Pull Request:
  Closes #89


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/0fa76c5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/0fa76c5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/0fa76c5c

Branch: refs/heads/master
Commit: 0fa76c5c8dcd3ed711091ff42d3b905267b41f83
Parents: 7dae6ff
Author: Markus Weimer <[email protected]>
Authored: Thu Feb 19 14:27:35 2015 -0800
Committer: taegeonum <[email protected]>
Committed: Sat Feb 21 12:27:01 2015 +0900

----------------------------------------------------------------------
 .../evaluator/EvaluatorControlHandler.java      | 12 +--
 .../driver/evaluator/EvaluatorManager.java      | 94 ++++++++++----------
 2 files changed, 51 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0fa76c5c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java
index ce62027..3a25783 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java
@@ -69,15 +69,9 @@ public final class EvaluatorControlHandler {
       throw new IllegalStateException("Trying to send an EvaluatorControlProto 
before the Evaluator ID is set.");
     }
     if (!this.stateManager.isRunning()) {
-      final String msg = new StringBuilder()
-          .append("Trying to send an EvaluatorControlProto to Evaluator [")
-          .append(this.evaluatorId)
-          .append("] that is in state [")
-          .append(this.stateManager.toString())
-          .append("], not [RUNNING]. The control message was: ")
-          .append(evaluatorControlProto.toString())
-          .toString();
-      throw new IllegalStateException(msg);
+      LOG.log(Level.WARNING, "Trying to send an EvaluatorControlProto to 
Evaluator [{0}] that is in state [{1}], not [RUNNING]. The control message was: 
{2}",
+          new Object[]{this.evaluatorId, this.stateManager, 
evaluatorControlProto});
+      return;
     }
     this.wrapped.get().onNext(evaluatorControlProto);
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0fa76c5c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
index 3938e06..a257234 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java
@@ -277,69 +277,71 @@ public final class EvaluatorManager implements 
Identifiable, AutoCloseable {
     }
   }
 
-  public synchronized void onEvaluatorHeartbeatMessage(
+  public void onEvaluatorHeartbeatMessage(
       final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> 
evaluatorHeartbeatProtoRemoteMessage) {
 
     final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto 
evaluatorHeartbeatProto =
         evaluatorHeartbeatProtoRemoteMessage.getMessage();
     LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto);
 
-    if (this.stateManager.isDoneOrFailedOrKilled()) {
-      LOG.log(Level.FINE, "Ignoring an heartbeat received for Evaluator {0} 
which is already in state {1}.",
-          new Object[]{this.getId(), this.stateManager});
-      return;
-    }
+    synchronized (this.evaluatorDescriptor) {
+      if (this.stateManager.isDoneOrFailedOrKilled()) {
+        LOG.log(Level.FINE, "Ignoring an heartbeat received for Evaluator {0} 
which is already in state {1}.",
+            new Object[]{this.getId(), this.stateManager});
+        return;
+      }
 
-    this.sanityChecker.check(evaluatorId, 
evaluatorHeartbeatProto.getTimestamp());
-    final String evaluatorRID = 
evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
+      this.sanityChecker.check(evaluatorId, 
evaluatorHeartbeatProto.getTimestamp());
+      final String evaluatorRID = 
evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString();
 
-    // first message from a running evaluator trying to re-establish 
communications
-    if (evaluatorHeartbeatProto.getRecovery()) {
-      this.evaluatorControlHandler.setRemoteID(evaluatorRID);
-      this.stateManager.setRunning();
+      // first message from a running evaluator trying to re-establish 
communications
+      if (evaluatorHeartbeatProto.getRecovery()) {
+        this.evaluatorControlHandler.setRemoteID(evaluatorRID);
+        this.stateManager.setRunning();
 
-      this.driverStatusManager.oneContainerRecovered();
-      final int numRecoveredContainers = 
this.driverStatusManager.getNumRecoveredContainers();
+        this.driverStatusManager.oneContainerRecovered();
+        final int numRecoveredContainers = 
this.driverStatusManager.getNumRecoveredContainers();
 
-      LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", 
this.evaluatorId);
-      final int expectedEvaluatorsNumber = 
this.driverStatusManager.getNumPreviousContainers();
+        LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", 
this.evaluatorId);
+        final int expectedEvaluatorsNumber = 
this.driverStatusManager.getNumPreviousContainers();
 
-      if (numRecoveredContainers > expectedEvaluatorsNumber) {
-        LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators, but 
[{1}] evaluators have checked in.",
-            new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
-        throw new RuntimeException("More then expected number of evaluators 
are checking in during recovery.");
-      } else if (numRecoveredContainers == expectedEvaluatorsNumber) {
-        LOG.log(Level.INFO, "All [{0}] expected evaluators have checked in. 
Recovery completed.", expectedEvaluatorsNumber);
-        this.driverStatusManager.setRestartCompleted();
-        this.messageDispatcher.OnDriverRestartCompleted(new 
DriverRestartCompleted(System.currentTimeMillis()));
-      } else {
-        LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}] 
evaluators have checked in.",
-            new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
+        if (numRecoveredContainers > expectedEvaluatorsNumber) {
+          LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators, 
but [{1}] evaluators have checked in.",
+              new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
+          throw new RuntimeException("More then expected number of evaluators 
are checking in during recovery.");
+        } else if (numRecoveredContainers == expectedEvaluatorsNumber) {
+          LOG.log(Level.INFO, "All [{0}] expected evaluators have checked in. 
Recovery completed.", expectedEvaluatorsNumber);
+          this.driverStatusManager.setRestartCompleted();
+          this.messageDispatcher.OnDriverRestartCompleted(new 
DriverRestartCompleted(System.currentTimeMillis()));
+        } else {
+          LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}] 
evaluators have checked in.",
+              new Object[]{expectedEvaluatorsNumber, numRecoveredContainers});
+        }
       }
-    }
 
-    // If this is the first message from this Evaluator, register it.
-    if (this.stateManager.isSubmitted()) {
-      this.evaluatorControlHandler.setRemoteID(evaluatorRID);
-      this.stateManager.setRunning();
-      LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
-    }
+      // If this is the first message from this Evaluator, register it.
+      if (this.stateManager.isSubmitted()) {
+        this.evaluatorControlHandler.setRemoteID(evaluatorRID);
+        this.stateManager.setRunning();
+        LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId);
+      }
 
-    // Process the Evaluator status message
-    if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
-      
this.onEvaluatorStatusMessage(evaluatorHeartbeatProto.getEvaluatorStatus());
-    }
+      // Process the Evaluator status message
+      if (evaluatorHeartbeatProto.hasEvaluatorStatus()) {
+        
this.onEvaluatorStatusMessage(evaluatorHeartbeatProto.getEvaluatorStatus());
+      }
 
-    // Process the Context status message(s)
-    final boolean informClientOfNewContexts = 
!evaluatorHeartbeatProto.hasTaskStatus();
-    
this.contextRepresenters.onContextStatusMessages(evaluatorHeartbeatProto.getContextStatusList(),
-        informClientOfNewContexts);
+      // Process the Context status message(s)
+      final boolean informClientOfNewContexts = 
!evaluatorHeartbeatProto.hasTaskStatus();
+      
this.contextRepresenters.onContextStatusMessages(evaluatorHeartbeatProto.getContextStatusList(),
+          informClientOfNewContexts);
 
-    // Process the Task status message
-    if (evaluatorHeartbeatProto.hasTaskStatus()) {
-      this.onTaskStatusMessage(evaluatorHeartbeatProto.getTaskStatus());
+      // Process the Task status message
+      if (evaluatorHeartbeatProto.hasTaskStatus()) {
+        this.onTaskStatusMessage(evaluatorHeartbeatProto.getTaskStatus());
+      }
+      LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", 
this.getId());
     }
-    LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", 
this.getId());
   }
 
   /**

Reply via email to