Repository: reef Updated Branches: refs/heads/master f5ae65982 -> d5a671b85
[REEF-1403] Deadlock between ContextRuntime.StartTask and HeartBeatManager.OnNext(Alarm) This addressed the issue by * Locking on HeartBeatManager on Evaluator startup in EvaluatorRuntime. JIRA: [REEF-1403](https://issues.apache.org/jira/browse/REEF-1403) Pull Request: This closes #1016 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d5a671b8 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d5a671b8 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d5a671b8 Branch: refs/heads/master Commit: d5a671b85ed24c3e068e649f41185e0bd8a2be0e Parents: f5ae659 Author: Andrew Chung <[email protected]> Authored: Thu May 26 09:53:43 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Tue May 31 14:19:58 2016 -0700 ---------------------------------------------------------------------- .../Runtime/Evaluator/EvaluatorRuntime.cs | 79 +++++++++++--------- .../Runtime/Evaluator/Task/TaskStatus.cs | 15 ++-- 2 files changed, 49 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/d5a671b8/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs index 1c92192..b8e3437 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs @@ -145,7 +145,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } if (message.kill_evaluator != null) { - Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by the driver.", _evaluatorId)); + Logger.Log(Level.Info, "Evaluator {0} has been killed by the driver.", _evaluatorId); _state = State.KILLED; _clock.Dispose(); } @@ -155,18 +155,20 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator public EvaluatorStatusProto GetEvaluatorStatus() { - Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state: {0}", _state)); - EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto + lock (_heartBeatManager) { - evaluator_id = _evaluatorId, - state = _state - }; - return evaluatorStatusProto; + Logger.Log(Level.Verbose, "Evaluator state: {0}", _state); + return new EvaluatorStatusProto + { + evaluator_id = _evaluatorId, + state = _state + }; + } } public void OnNext(RuntimeStart runtimeStart) { - lock (_evaluatorId) + lock (_heartBeatManager) { try { @@ -191,40 +193,43 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator public void OnNext(RuntimeStop runtimeStop) { - Logger.Log(Level.Info, "Runtime stop"); + Logger.Log(Level.Verbose, "Runtime stop"); - if (_state == State.RUNNING) + lock (_heartBeatManager) { - const string msg = "RuntimeStopHandler invoked in state RUNNING."; - if (runtimeStop.Exception != null) + if (_state == State.RUNNING) { - OnException(new SystemException(msg, runtimeStop.Exception)); + const string msg = "RuntimeStopHandler invoked in state RUNNING."; + if (runtimeStop.Exception != null) + { + OnException(new SystemException(msg, runtimeStop.Exception)); + } + else + { + OnException(new SystemException(msg)); + } } else { - OnException(new SystemException(msg)); - } - } - else - { - var exceptionOccurredOnDispose = false; - try - { - _contextManager.Dispose(); - _evaluatorControlChannel.Dispose(); - } - catch (Exception e) - { - exceptionOccurredOnDispose = true; - Utilities.Diagnostics.Exceptions.CaughtAndThrow( - new InvalidOperationException("Cannot stop evaluator properly", e), - Level.Error, - "Exception during shut down.", - Logger); - } - finally - { - _evaluatorExitLogger.LogExit(exceptionOccurredOnDispose); + var exceptionOccurredOnDispose = false; + try + { + _contextManager.Dispose(); + _evaluatorControlChannel.Dispose(); + } + catch (Exception e) + { + exceptionOccurredOnDispose = true; + Utilities.Diagnostics.Exceptions.CaughtAndThrow( + new InvalidOperationException("Cannot stop evaluator properly", e), + Level.Error, + "Exception during shut down.", + Logger); + } + finally + { + _evaluatorExitLogger.LogExit(exceptionOccurredOnDispose); + } } } } @@ -233,7 +238,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator { if (value != null && value.evaluatorControl != null) { - Logger.Log(Level.Info, "Received a REEFMessage with EvaluatorControl"); + Logger.Log(Level.Verbose, "Received a REEFMessage with EvaluatorControl"); Handle(value.evaluatorControl); } } http://git-wip-us.apache.org/repos/asf/reef/blob/d5a671b8/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs index 3e8a128..ae0141e 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs @@ -38,7 +38,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskStatus)); - private readonly object _stateLock = new object(); private readonly BinaryFormatter _binaryFormatter = new BinaryFormatter(); private readonly TaskLifeCycle _taskLifeCycle; @@ -101,7 +100,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void SetException(Exception e) { - lock (_stateLock) + lock (_heartBeatManager) { try { @@ -132,7 +131,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void SetResult(byte[] result) { - lock (_stateLock) + lock (_heartBeatManager) { _result = Optional<byte[]>.OfNullable(result); switch (State) @@ -152,7 +151,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void SetRunning() { - lock (_stateLock) + lock (_heartBeatManager) { LOGGER.Log(Level.Verbose, "TaskStatus::SetRunning"); if (_state == TaskState.Init) @@ -177,7 +176,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void SetCloseRequested() { - lock (_stateLock) + lock (_heartBeatManager) { if (HasEnded()) { @@ -190,7 +189,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void SetSuspendRequested() { - lock (_stateLock) + lock (_heartBeatManager) { if (HasEnded()) { @@ -203,7 +202,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void SetKilled() { - lock (_stateLock) + lock (_heartBeatManager) { if (HasEnded()) { @@ -239,7 +238,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { // This is locked because the Task continuation thread which sets the // result is potentially different from the HeartBeat thread. - lock (_stateLock) + lock (_heartBeatManager) { Check(); TaskStatusProto taskStatusProto = new TaskStatusProto()
