Repository: reef Updated Branches: refs/heads/master ffa976a70 -> be87fe58f
[REEF-1343] Fix events received in case of evaluator failure This addressed the issue by * Invoke RuntimeStop on Exception in RuntimeClock. * Fix handling of Exceptions in EvaluatorRuntime. JIRA: [REEF-1343](https://issues.apache.org/jira/browse/REEF-1343) Pull Request: This closes #961 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/be87fe58 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/be87fe58 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/be87fe58 Branch: refs/heads/master Commit: be87fe58f8cc80ad84cd4494812fcaa310f4a31e Parents: ffa976a Author: Andrew Chung <[email protected]> Authored: Wed Apr 20 14:12:23 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Mon Apr 25 14:06:37 2016 -0700 ---------------------------------------------------------------------- .../Runtime/Evaluator/EvaluatorRuntime.cs | 54 +++++++++------ .../Functional/FaultTolerant/PoisonTest.cs | 70 +++++++++++++------- .../Time/Runtime/RuntimeClock.cs | 45 ++++++++----- 3 files changed, 111 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/be87fe58/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs index 45ff233..495b2c2 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs @@ -80,7 +80,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator Logger.Log(Level.Info, "Handle Evaluator control message"); if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase)) { - Handle(new InvalidOperationException( + OnException(new InvalidOperationException( string.Format(CultureInfo.InvariantCulture, "Identifier mismatch: message for evaluator id[{0}] sent to evaluator id[{1}]", message.identifier, _evaluatorId))); } else if (_state == State.DONE) @@ -94,12 +94,12 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } else { - Handle(new InvalidOperationException("Received a control message from Driver after Evaluator is done.")); + OnException(new InvalidOperationException("Received a control message from Driver after Evaluator is done.")); } } else if (_state != State.RUNNING) { - Handle(new InvalidOperationException( + OnException(new InvalidOperationException( string.Format(CultureInfo.InvariantCulture, "Evaluator received a control message but its state is not {0} but rather {1}", State.RUNNING, _state))); } else @@ -120,7 +120,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator catch (Exception e) { Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, Logger); - Handle(e); + OnException(e); Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(e.ToString(), e), Logger); } } @@ -164,7 +164,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator catch (Exception e) { Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, Logger); - Handle(e); + OnException(e); } } } @@ -172,22 +172,33 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator public void OnNext(RuntimeStop runtimeStop) { Logger.Log(Level.Info, "Runtime stop"); - _contextManager.Dispose(); if (_state == State.RUNNING) { - _state = State.DONE; - _heartBeatManager.OnNext(); - } - try - { - _evaluatorControlChannel.Dispose(); + const string msg = "RuntimeStopHandler invoked in state RUNNING."; + if (runtimeStop.Exception != null) + { + OnException(new SystemException(msg, runtimeStop.Exception)); + } + else + { + OnException(new SystemException(msg)); + } } - catch (Exception e) + else { - Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, "Exception during shut down.", Logger); + try + { + _contextManager.Dispose(); + _evaluatorControlChannel.Dispose(); + } + catch (Exception e) + { + Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, "Exception during shut down.", Logger); + } } - Logger.Log(Level.Info, "EvaluatorRuntime shutdown complete"); + + Logger.Log(Level.Info, "EvaluatorRuntime shutdown complete"); } public void OnNext(REEFMessage value) @@ -199,27 +210,30 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } } - private void Handle(Exception e) + private void OnException(Exception e) { lock (_heartBeatManager) { - Logger.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with exception", _evaluatorId), e); + Logger.Log(Level.Error, "Evaluator {0} failed with exception {1}.", _evaluatorId, e); _state = State.FAILED; - string errorMessage = string.Format( + + var errorMessage = string.Format( CultureInfo.InvariantCulture, "failed with error [{0}] with message [{1}] and stack trace [{2}]", e, e.Message, e.StackTrace); - EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto() + + var evaluatorStatusProto = new EvaluatorStatusProto() { evaluator_id = _evaluatorId, error = ByteUtilities.StringToByteArrays(errorMessage), state = _state }; + _heartBeatManager.OnNext(evaluatorStatusProto); _contextManager.Dispose(); - } + } } public void OnError(Exception error) http://git-wip-us.apache.org/repos/asf/reef/blob/be87fe58/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs index 7aab1ac..b0d1c40 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs @@ -16,6 +16,7 @@ // under the License. using System; +using System.Linq; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Context; @@ -27,8 +28,8 @@ using Org.Apache.REEF.Utilities.Logging; using Xunit; using System.Threading; using Org.Apache.REEF.Common.Context; -using Org.Apache.REEF.Common.Events; using Org.Apache.REEF.Common.Poison; +using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.Driver.Task; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Implementations.Configuration; @@ -43,10 +44,11 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant private const string Prefix = "Poison: "; private const string FailedEvaluatorMessage = "I have succeeded in seeing a failed evaluator."; private const string TaskId = "1234567"; + private const string ContextId = "ContextID"; [Fact] [Trait("Description", "Test Poison functionality by injecting fault in context start handler.")] - public void TestPoisonedEvaluatorStartHanlder() + public void TestPoisonedEvaluatorStartHandler() { string testFolder = DefaultRuntimeFolder + TestId; TestRun(DriverConfigurations(), typeof(PoisonedEvaluatorDriver), 1, "poisonedEvaluatorStartTest", "local", testFolder); @@ -87,45 +89,67 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant public void OnNext(IAllocatedEvaluator value) { - var c1 = ContextConfiguration.ConfigurationModule - .Set(ContextConfiguration.Identifier, "ContextID") - .Set(ContextConfiguration.OnContextStart, GenericType<PoisonedEventHandler<IContextStart>>.Class) - .Build(); - - var c2 = TangFactory.GetTang().NewConfigurationBuilder() - .BindIntNamedParam<CrashTimeout>("500") - .BindIntNamedParam<CrashMinDelay>("100") - .BindNamedParameter<CrashProbability, double>(GenericType<CrashProbability>.Class, "1.0") - .Build(); - - value.SubmitContext(Configurations.Merge(c1, c2)); + value.SubmitContext(ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, ContextId) + .Build()); } public void OnNext(IActiveContext value) { - value.SubmitTask(TaskConfiguration.ConfigurationModule + var taskConfig = TaskConfiguration.ConfigurationModule .Set(TaskConfiguration.Identifier, TaskId) .Set(TaskConfiguration.Task, GenericType<SleepTask>.Class) - .Build()); + .Set(TaskConfiguration.OnTaskStart, GenericType<PoisonedEventHandler<ITaskStart>>.Class) + .Build(); + + var poisonConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindIntNamedParam<CrashTimeout>("500") + .BindIntNamedParam<CrashMinDelay>("100") + .BindNamedParameter<CrashProbability, double>(GenericType<CrashProbability>.Class, "1.0") + .Build(); + + value.SubmitTask(Configurations.Merge(taskConfig, poisonConfig)); } public void OnNext(IFailedEvaluator value) { Logger.Log(Level.Error, FailedEvaluatorMessage); - if (value.FailedTask.Value == null) + if (value.FailedTask.Value == null || !value.FailedTask.IsPresent()) + { + throw new Exception("No failed Task associated with failed Evaluator"); + } + + if (value.FailedTask.Value.Id != TaskId) + { + throw new Exception("Failed Task ID returned " + value.FailedTask.Value.Id + + ", was expecting Task ID " + TaskId); + } + + Logger.Log(Level.Info, "Received all expected failed Tasks."); + + const string expectedStr = "expected a single Context with Context ID " + ContextId + "."; + + if (value.FailedContexts == null) { - // TODO[JIRA REEF-1343]: fail the test if there's no failed task - Logger.Log(Level.Error, "No failed task associated with failed evaluator"); + throw new Exception("No Context was present but " + expectedStr); } - else + + if (value.FailedContexts.Count != 1) { - Logger.Log(Level.Error, "Failed task id '" + value.FailedTask.Value.Id + "'"); + throw new Exception("Collection of failed Contexts contains " + value.FailedContexts.Count + " failed Contexts but only " + expectedStr); } + + if (!value.FailedContexts.Select(ctx => ctx.Id).Contains(ContextId)) + { + throw new Exception("Collection of failed Contexts does not contain expected Context ID " + ContextId + "."); + } + + Logger.Log(Level.Info, "Received all expected failed Contexts."); } + public void OnNext(ICompletedTask value) { - // TODO[JIRA REEF-1343]: fail the test if receive ICompletedTask after failed evaluator - Logger.Log(Level.Info, "ICompletedTask"); + throw new Exception("A completed task was not expected."); } public void OnError(Exception error) http://git-wip-us.apache.org/repos/asf/reef/blob/be87fe58/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs index 98be51b..8c14c51 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs @@ -21,6 +21,7 @@ using System.Linq; using System.Threading; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.InjectionPlan; +using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Collections; using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; @@ -151,29 +152,43 @@ namespace Org.Apache.REEF.Wake.Time.Runtime public void Run() { SubscribeHandlers(); - _handlers.OnNext(new RuntimeStart(_timer.CurrentTime)); - _handlers.OnNext(new StartTime(_timer.CurrentTime)); - while (true) + var runtimeException = Optional<Exception>.Empty(); + try { - lock (_schedule) + _handlers.OnNext(new RuntimeStart(_timer.CurrentTime)); + _handlers.OnNext(new StartTime(_timer.CurrentTime)); + + while (true) { - if (IsIdle()) + lock (_schedule) { - _handlers.OnNext(new IdleClock(_timer.CurrentTime)); - } - - // Blocks and releases lock until it receives the next event - Time alarm = GetNextEvent(); - ProcessEvent(alarm); + if (IsIdle()) + { + _handlers.OnNext(new IdleClock(_timer.CurrentTime)); + } - if (alarm is StopTime) - { - break; + // Blocks and releases lock until it receives the next event + Time alarm = GetNextEvent(); + ProcessEvent(alarm); + + if (alarm is StopTime) + { + break; + } } } } - _handlers.OnNext(new RuntimeStop(_timer.CurrentTime)); + catch (Exception e) + { + runtimeException = Optional<Exception>.Of(new SystemException("Caught Exception in clock, failing the Evaluator.", e)); + } + + var runtimeStop = runtimeException.IsPresent() + ? new RuntimeStop(_timer.CurrentTime, runtimeException.Value) + : new RuntimeStop(_timer.CurrentTime); + + _handlers.OnNext(runtimeStop); } /// <summary>
