Repository: incubator-reef Updated Branches: refs/heads/master 7dae6ff6d -> 0fa76c5c8
[REEF-151] Fixed a synchronization issue in EvaluatorManager In EvaluatorManager, `onEvaluatorHeartbeatMessage` was `synchronized`, while `onEvaluatorException` was `synchronized(evaluatorDescriptor)`. This can create the race condition described in the JIRA. This PR is to change the synchronization in `onEvaluatorHeartbeatMessage` to match the common pattern in the rest of the file. Also, this makes `EvaluatorControlHandler.send()` safe to use when the Evaluator is already down. It used to crash in those situation. This change writes a WARNING to the log instead. JIRA: [REEF-151](https://issues.apache.org/jira/browse/REEF-151) Pull Request: Closes #89 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/0fa76c5c Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/0fa76c5c Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/0fa76c5c Branch: refs/heads/master Commit: 0fa76c5c8dcd3ed711091ff42d3b905267b41f83 Parents: 7dae6ff Author: Markus Weimer <[email protected]> Authored: Thu Feb 19 14:27:35 2015 -0800 Committer: taegeonum <[email protected]> Committed: Sat Feb 21 12:27:01 2015 +0900 ---------------------------------------------------------------------- .../evaluator/EvaluatorControlHandler.java | 12 +-- .../driver/evaluator/EvaluatorManager.java | 94 ++++++++++---------- 2 files changed, 51 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0fa76c5c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java index ce62027..3a25783 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorControlHandler.java @@ -69,15 +69,9 @@ public final class EvaluatorControlHandler { throw new IllegalStateException("Trying to send an EvaluatorControlProto before the Evaluator ID is set."); } if (!this.stateManager.isRunning()) { - final String msg = new StringBuilder() - .append("Trying to send an EvaluatorControlProto to Evaluator [") - .append(this.evaluatorId) - .append("] that is in state [") - .append(this.stateManager.toString()) - .append("], not [RUNNING]. The control message was: ") - .append(evaluatorControlProto.toString()) - .toString(); - throw new IllegalStateException(msg); + LOG.log(Level.WARNING, "Trying to send an EvaluatorControlProto to Evaluator [{0}] that is in state [{1}], not [RUNNING]. The control message was: {2}", + new Object[]{this.evaluatorId, this.stateManager, evaluatorControlProto}); + return; } this.wrapped.get().onNext(evaluatorControlProto); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0fa76c5c/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java index 3938e06..a257234 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManager.java @@ -277,69 +277,71 @@ public final class EvaluatorManager implements Identifiable, AutoCloseable { } } - public synchronized void onEvaluatorHeartbeatMessage( + public void onEvaluatorHeartbeatMessage( final RemoteMessage<EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto> evaluatorHeartbeatProtoRemoteMessage) { final EvaluatorRuntimeProtocol.EvaluatorHeartbeatProto evaluatorHeartbeatProto = evaluatorHeartbeatProtoRemoteMessage.getMessage(); LOG.log(Level.FINEST, "Evaluator heartbeat: {0}", evaluatorHeartbeatProto); - if (this.stateManager.isDoneOrFailedOrKilled()) { - LOG.log(Level.FINE, "Ignoring an heartbeat received for Evaluator {0} which is already in state {1}.", - new Object[]{this.getId(), this.stateManager}); - return; - } + synchronized (this.evaluatorDescriptor) { + if (this.stateManager.isDoneOrFailedOrKilled()) { + LOG.log(Level.FINE, "Ignoring an heartbeat received for Evaluator {0} which is already in state {1}.", + new Object[]{this.getId(), this.stateManager}); + return; + } - this.sanityChecker.check(evaluatorId, evaluatorHeartbeatProto.getTimestamp()); - final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString(); + this.sanityChecker.check(evaluatorId, evaluatorHeartbeatProto.getTimestamp()); + final String evaluatorRID = evaluatorHeartbeatProtoRemoteMessage.getIdentifier().toString(); - // first message from a running evaluator trying to re-establish communications - if (evaluatorHeartbeatProto.getRecovery()) { - this.evaluatorControlHandler.setRemoteID(evaluatorRID); - this.stateManager.setRunning(); + // first message from a running evaluator trying to re-establish communications + if (evaluatorHeartbeatProto.getRecovery()) { + this.evaluatorControlHandler.setRemoteID(evaluatorRID); + this.stateManager.setRunning(); - this.driverStatusManager.oneContainerRecovered(); - final int numRecoveredContainers = this.driverStatusManager.getNumRecoveredContainers(); + this.driverStatusManager.oneContainerRecovered(); + final int numRecoveredContainers = this.driverStatusManager.getNumRecoveredContainers(); - LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", this.evaluatorId); - final int expectedEvaluatorsNumber = this.driverStatusManager.getNumPreviousContainers(); + LOG.log(Level.FINE, "Received recovery heartbeat from evaluator {0}.", this.evaluatorId); + final int expectedEvaluatorsNumber = this.driverStatusManager.getNumPreviousContainers(); - if (numRecoveredContainers > expectedEvaluatorsNumber) { - LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators, but [{1}] evaluators have checked in.", - new Object[]{expectedEvaluatorsNumber, numRecoveredContainers}); - throw new RuntimeException("More then expected number of evaluators are checking in during recovery."); - } else if (numRecoveredContainers == expectedEvaluatorsNumber) { - LOG.log(Level.INFO, "All [{0}] expected evaluators have checked in. Recovery completed.", expectedEvaluatorsNumber); - this.driverStatusManager.setRestartCompleted(); - this.messageDispatcher.OnDriverRestartCompleted(new DriverRestartCompleted(System.currentTimeMillis())); - } else { - LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}] evaluators have checked in.", - new Object[]{expectedEvaluatorsNumber, numRecoveredContainers}); + if (numRecoveredContainers > expectedEvaluatorsNumber) { + LOG.log(Level.SEVERE, "expecting only [{0}] recovered evaluators, but [{1}] evaluators have checked in.", + new Object[]{expectedEvaluatorsNumber, numRecoveredContainers}); + throw new RuntimeException("More then expected number of evaluators are checking in during recovery."); + } else if (numRecoveredContainers == expectedEvaluatorsNumber) { + LOG.log(Level.INFO, "All [{0}] expected evaluators have checked in. Recovery completed.", expectedEvaluatorsNumber); + this.driverStatusManager.setRestartCompleted(); + this.messageDispatcher.OnDriverRestartCompleted(new DriverRestartCompleted(System.currentTimeMillis())); + } else { + LOG.log(Level.INFO, "expecting [{0}] recovered evaluators, [{1}] evaluators have checked in.", + new Object[]{expectedEvaluatorsNumber, numRecoveredContainers}); + } } - } - // If this is the first message from this Evaluator, register it. - if (this.stateManager.isSubmitted()) { - this.evaluatorControlHandler.setRemoteID(evaluatorRID); - this.stateManager.setRunning(); - LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId); - } + // If this is the first message from this Evaluator, register it. + if (this.stateManager.isSubmitted()) { + this.evaluatorControlHandler.setRemoteID(evaluatorRID); + this.stateManager.setRunning(); + LOG.log(Level.FINEST, "Evaluator {0} is running", this.evaluatorId); + } - // Process the Evaluator status message - if (evaluatorHeartbeatProto.hasEvaluatorStatus()) { - this.onEvaluatorStatusMessage(evaluatorHeartbeatProto.getEvaluatorStatus()); - } + // Process the Evaluator status message + if (evaluatorHeartbeatProto.hasEvaluatorStatus()) { + this.onEvaluatorStatusMessage(evaluatorHeartbeatProto.getEvaluatorStatus()); + } - // Process the Context status message(s) - final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus(); - this.contextRepresenters.onContextStatusMessages(evaluatorHeartbeatProto.getContextStatusList(), - informClientOfNewContexts); + // Process the Context status message(s) + final boolean informClientOfNewContexts = !evaluatorHeartbeatProto.hasTaskStatus(); + this.contextRepresenters.onContextStatusMessages(evaluatorHeartbeatProto.getContextStatusList(), + informClientOfNewContexts); - // Process the Task status message - if (evaluatorHeartbeatProto.hasTaskStatus()) { - this.onTaskStatusMessage(evaluatorHeartbeatProto.getTaskStatus()); + // Process the Task status message + if (evaluatorHeartbeatProto.hasTaskStatus()) { + this.onTaskStatusMessage(evaluatorHeartbeatProto.getTaskStatus()); + } + LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId()); } - LOG.log(Level.FINE, "DONE with evaluator heartbeat from Evaluator {0}", this.getId()); } /**
