Repository: reef Updated Branches: refs/heads/master 07e654977 -> bcf7ff881
[REEF-1424] Validate Task StartHandler failure => FailedTask Event This addressed the issue by * Concentrating Task Exception logic handling into TaskRuntime.cs instead of in TaskLifeCycle or TaskStatus. * Adding TaskStartExceptionTest. * Adding TaskStartHandlerException to indicate that an Exception happened in the TaskStartHandler. JIRA: [REEF-1424](https://issues.apache.org/jira/browse/REEF-1424) This closes #1043 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/bcf7ff88 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/bcf7ff88 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/bcf7ff88 Branch: refs/heads/master Commit: bcf7ff8813f867eb67c1b713569f19e0939341e5 Parents: 07e6549 Author: Andrew Chung <[email protected]> Authored: Fri Jun 10 11:25:52 2016 -0700 Committer: Julia Wang <[email protected]> Committed: Thu Jun 16 19:48:11 2016 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Common.csproj | 1 + .../Runtime/Evaluator/Task/TaskLifeCycle.cs | 15 +- .../Runtime/Evaluator/Task/TaskRuntime.cs | 4 + .../Evaluator/Task/TaskStartHandlerException.cs | 28 +++ .../Runtime/Evaluator/Task/TaskStatus.cs | 28 +-- .../Failure/User/TaskStartExceptionTest.cs | 207 +++++++++++++++++++ .../Failure/User/TaskStopExceptionTest.cs | 1 - .../Org.Apache.REEF.Tests.csproj | 1 + 8 files changed, 258 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index 3058b91..3582f00 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -163,6 +163,7 @@ under the License. <Compile Include="Runtime\Evaluator\Task\TaskClientCodeException.cs" /> <Compile Include="Runtime\Evaluator\Task\TaskLifeCycle.cs" /> <Compile Include="Runtime\Evaluator\Task\TaskRuntime.cs" /> + <Compile Include="Runtime\Evaluator\Task\TaskStartHandlerException.cs" /> <Compile Include="Runtime\Evaluator\Task\TaskStartImpl.cs" /> <Compile Include="Runtime\Evaluator\Task\TaskState.cs" /> <Compile Include="Runtime\Evaluator\Task\TaskStatus.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs index 687cb9c..9eb9ea5 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs @@ -51,13 +51,20 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void Start() { - if (Interlocked.Exchange(ref _startHasBeenInvoked, 1) == 0) + try { - foreach (var startHandler in _taskStartHandlers) + if (Interlocked.Exchange(ref _startHasBeenInvoked, 1) == 0) { - startHandler.OnNext(_taskStart); + foreach (var startHandler in _taskStartHandlers) + { + startHandler.OnNext(_taskStart); + } } } + catch (Exception e) + { + throw new TaskStartHandlerException("Encountered Exception in TaskStartHandler.", e); + } } public void Stop() @@ -74,7 +81,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } catch (Exception e) { - throw new TaskStopHandlerException("Encountered Exception on TaskStopHandler.", e); + throw new TaskStopHandlerException("Encountered Exception in TaskStopHandler.", e); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 8a13d63..b14e3b9 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -110,6 +110,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); } } + catch (TaskStartHandlerException e) + { + _currentStatus.SetException(e.InnerException); + } catch (TaskStopHandlerException e) { _currentStatus.SetException(e.InnerException); http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartHandlerException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartHandlerException.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartHandlerException.cs new file mode 100644 index 0000000..07e8b10 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartHandlerException.cs @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task +{ + internal sealed class TaskStartHandlerException : Exception + { + internal TaskStartHandlerException(string message, Exception inner) : base(message, inner) + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs index 0347033..5e4ca2f 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs @@ -143,17 +143,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task LOGGER.Log(Level.Verbose, "TaskStatus::SetInit"); if (_state == TaskState.Init) { - try - { - _taskLifeCycle.Start(); - LOGGER.Log(Level.Info, "Sending task INIT heartbeat"); - Heartbeat(); - } - catch (Exception e) - { - Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Cannot set task status to INIT.", LOGGER); - SetException(e); - } + LOGGER.Log(Level.Verbose, "Sending task INIT heartbeat"); + Heartbeat(); } } } @@ -165,17 +156,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task LOGGER.Log(Level.Verbose, "TaskStatus::SetRunning"); if (_state == TaskState.Init) { - try - { - State = TaskState.Running; - LOGGER.Log(Level.Info, "Sending task Running heartbeat"); - Heartbeat(); - } - catch (Exception e) - { - Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Cannot set task status to running.", LOGGER); - SetException(e); - } + _taskLifeCycle.Start(); + State = TaskState.Running; + LOGGER.Log(Level.Verbose, "Sending task Running heartbeat"); + Heartbeat(); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStartExceptionTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStartExceptionTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStartExceptionTest.cs new file mode 100644 index 0000000..a218aef --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStartExceptionTest.cs @@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Runtime.Serialization; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Tests.Functional.Common.Task; +using Org.Apache.REEF.Tests.Functional.Common.Task.Handlers; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Failure.User +{ + /// <summary> + /// This test class contains a test that validates that an Exception in + /// a TaskStartHandler triggers an <see cref="IFailedTask"/> event. + /// </summary> + [Collection("FunctionalTests")] + public sealed class TaskStartExceptionTest : ReefFunctionalTest + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TaskStartExceptionTest)); + + private const string TaskStartExceptionMessage = "TaskStartExceptionMessage"; + private const string InitialTaskMessage = "InitialTaskMessage"; + private const string ResubmitTaskMessage = "ResubmitTaskMessage"; + private const string FailedTaskReceived = "FailedTaskReceived"; + private const string CompletedTaskReceived = "CompletedTaskReceived"; + + /// <summary> + /// This test validates that an Exception in the TaskStartHandler causes a FailedTask + /// event in the Driver, and that a new Task can be submitted on the original Context. + /// </summary> + [Fact] + public void TestStopTaskWithExceptionOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<TaskStartExceptionTestDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<TaskStartExceptionTestDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<TaskStartExceptionTestDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<TaskStartExceptionTestDriver>.Class) + .Build(), typeof(TaskStartExceptionTestDriver), 1, "testStartTaskWithExceptionOnLocalRuntime", "local", testFolder); + + var driverMessages = new List<string> + { + FailedTaskReceived, + CompletedTaskReceived + }; + + ValidateMessagesSuccessfullyLoggedForDriver(driverMessages, testFolder, 1); + + // Validate that the first Task never starts. + ValidateMessageSuccessfullyLogged( + new List<string> { InitialTaskMessage }, "Node-*", EvaluatorStdout, testFolder, 0); + + ValidateMessageSuccessfullyLogged( + new List<string> { ResubmitTaskMessage }, "Node-*", EvaluatorStdout, testFolder, 1); + + CleanUp(testFolder); + } + + private sealed class TaskStartExceptionTestDriver : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<ICompletedTask>, + IObserver<IFailedTask> + { + private readonly IEvaluatorRequestor _requestor; + + [Inject] + private TaskStartExceptionTestDriver(IEvaluatorRequestor requestor) + { + _requestor = requestor; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + // submit the first Task. + value.SubmitTask(TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "TaskID") + .Set(TaskConfiguration.Task, GenericType<TaskStartExceptionTask>.Class) + .Set(TaskConfiguration.OnTaskStart, GenericType<TaskStartHandlerWithException>.Class) + .Build()); + } + + public void OnNext(ICompletedTask value) + { + // Should only receive one CompletedTask, as validated. + Logger.Log(Level.Info, CompletedTaskReceived); + value.ActiveContext.Dispose(); + } + + public void OnNext(IFailedTask value) + { + // Check that Exceptions are deserialized correctly. + var ex = value.AsError(); + if (ex == null) + { + throw new Exception("Exception was not expected to be null."); + } + + var taskStartEx = ex as TaskStartExceptionTestException; + + Assert.True(taskStartEx != null, "Expected Exception to be of type TaskStartExceptionTestException, but instead got type " + ex.GetType().Name); + Assert.True(taskStartEx.Message.Equals(TaskStartExceptionMessage), + "Expected message to be " + TaskStartExceptionMessage + " but instead got " + taskStartEx.Message + "."); + + Logger.Log(Level.Info, FailedTaskReceived); + + // Submit the new Task to verify that the original Context accepts new Tasks. + value.GetActiveContext().Value.SubmitTask( + TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "TaskID") + .Set(TaskConfiguration.Task, GenericType<TaskStartExceptionResubmitTask>.Class) + .Build()); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + private sealed class TaskStartExceptionTask : LoggingTask + { + [Inject] + private TaskStartExceptionTask() + : base(InitialTaskMessage) + { + } + } + + /// <summary> + /// A simple Task for Task resubmission validation on a Context with a previous Task + /// that failed on a Task StartHandler. + /// </summary> + private sealed class TaskStartExceptionResubmitTask : LoggingTask + { + [Inject] + private TaskStartExceptionResubmitTask() + : base(ResubmitTaskMessage) + { + } + } + + /// <summary> + /// Throws a test Exception on Task Start to trigger a task failure. + /// </summary> + private sealed class TaskStartHandlerWithException : ExceptionThrowingHandler<ITaskStart> + { + [Inject] + private TaskStartHandlerWithException() : + base(new TaskStartExceptionTestException(TaskStartExceptionMessage)) + { + } + } + + /// <summary> + /// A Serializable Exception to verify that the Exception is deserialized correctly. + /// </summary> + [Serializable] + private sealed class TaskStartExceptionTestException : Exception + { + public TaskStartExceptionTestException(string message) + : base(message) + { + } + + private TaskStartExceptionTestException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs index 84aba1f..da2ba9e 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs @@ -69,7 +69,6 @@ namespace Org.Apache.REEF.Tests.Functional.Failure.User }; ValidateMessagesSuccessfullyLoggedForDriver(driverMessages, testFolder, 1); - ValidateMessageSuccessfullyLogged(driverMessages, "driver", DriverStdout, testFolder, 1); var evaluatorMessages = new List<string> { InitialTaskMessage, ResubmitTaskMessage }; ValidateMessageSuccessfullyLogged(evaluatorMessages, "Node-*", EvaluatorStdout, testFolder, 1); http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 997d05d..6dde363 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -88,6 +88,7 @@ under the License. <Compile Include="Functional\Common\Task\Handlers\LoggingHandler.cs" /> <Compile Include="Functional\Common\Task\LoggingTask.cs" /> <Compile Include="Functional\Common\Task\Handlers\ExceptionThrowingHandler.cs" /> + <Compile Include="Functional\Failure\User\TaskStartExceptionTest.cs" /> <Compile Include="Functional\Failure\User\UnhandledThreadExceptionInTaskTest.cs" /> <Compile Include="Functional\Driver\DriverTestStartHandler.cs" /> <Compile Include="Functional\Failure\BasePoisonedEvaluatorWithActiveContextDriver.cs" />
