Repository: reef Updated Branches: refs/heads/master 798f2dbb0 -> d6dbea6df
[REEF-1428] Validate Task Stop failure => FailedTask Event This addressed the issue by * Writing a test to validate that a Task Stop failure triggers a FailedTask Event. * Adding helper test classes. * Moving the call to TaskStop prior to setting the result of the Task. * Inject all ITaskStart and ITaskStop. JIRA: [REEF-1428](https://issues.apache.org/jira/browse/REEF-1428) This closes #1038 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d6dbea6d Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d6dbea6d Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d6dbea6d Branch: refs/heads/master Commit: d6dbea6dfcced497c699e6d055015f72cf102c68 Parents: 798f2db Author: Andrew Chung <[email protected]> Authored: Tue Jun 7 16:11:08 2016 -0700 Committer: Julia Wang <[email protected]> Committed: Thu Jun 9 10:48:19 2016 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Common.csproj | 1 + .../Runtime/Evaluator/Context/ContextManager.cs | 1 - .../Runtime/Evaluator/Task/TaskLifeCycle.cs | 44 ++-- .../Runtime/Evaluator/Task/TaskRuntime.cs | 11 +- .../Runtime/Evaluator/Task/TaskStatus.cs | 13 +- .../Evaluator/Task/TaskStopHandlerException.cs | 31 +++ .../Functional/Bridge/TestContextStack.cs | 14 +- .../Task/Handlers/ExceptionThrowingHandler.cs | 58 ++++++ .../Common/Task/Handlers/LoggingHandler.cs | 53 +++++ .../Functional/Common/Task/LoggingTask.cs | 48 +++++ .../Failure/User/TaskStopExceptionTest.cs | 201 +++++++++++++++++++ .../Functional/ReefFunctionalTest.cs | 16 +- .../Org.Apache.REEF.Tests.csproj | 4 + 13 files changed, 448 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index 6a64571..3058b91 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -166,6 +166,7 @@ under the License. <Compile Include="Runtime\Evaluator\Task\TaskStartImpl.cs" /> <Compile Include="Runtime\Evaluator\Task\TaskState.cs" /> <Compile Include="Runtime\Evaluator\Task\TaskStatus.cs" /> + <Compile Include="Runtime\Evaluator\Task\TaskStopHandlerException.cs" /> <Compile Include="Runtime\Evaluator\Task\TaskStopImpl.cs" /> <Compile Include="Runtime\Evaluator\Utils\NamedparameterAlias.cs" /> <Compile Include="runtime\MachineStatus.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs index 3f7c5ae..1387d51 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs @@ -350,7 +350,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// <param name="e"></param> private void HandleTaskException(TaskClientCodeException e) { - LOGGER.Log(Level.Error, "TaskClientCodeException", e); byte[] error; try { http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs index 115f4d2..687cb9c 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; +using System.Threading; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.Tang.Annotations; @@ -29,8 +30,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { private readonly IReadOnlyCollection<IObserver<ITaskStop>> _taskStopHandlers; private readonly IReadOnlyCollection<IObserver<ITaskStart>> _taskStartHandlers; - private readonly Optional<ITaskStart> _taskStart; - private readonly Optional<ITaskStop> _taskStop; + private readonly ITaskStart _taskStart; + private readonly ITaskStop _taskStop; + + private int _startHasBeenInvoked = 0; + private int _stopHasBeenInvoked = 0; [Inject] private TaskLifeCycle( @@ -38,15 +42,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task [Parameter(typeof(TaskConfigurationOptions.StopHandlers))] ISet<IObserver<ITaskStop>> taskStopHandlers, ITaskStart taskStart, ITaskStop taskStop) - : this(taskStartHandlers, taskStopHandlers, Optional<ITaskStart>.Of(taskStart), Optional<ITaskStop>.Of(taskStop)) - { - } - - private TaskLifeCycle( - IEnumerable<IObserver<ITaskStart>> taskStartHandlers, - IEnumerable<IObserver<ITaskStop>> taskStopHandlers, - Optional<ITaskStart> taskStart, - Optional<ITaskStop> taskStop) { _taskStartHandlers = new ReadOnlySet<IObserver<ITaskStart>>(taskStartHandlers); _taskStopHandlers = new ReadOnlySet<IObserver<ITaskStop>>(taskStopHandlers); @@ -56,27 +51,30 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void Start() { - if (!_taskStart.IsPresent()) + if (Interlocked.Exchange(ref _startHasBeenInvoked, 1) == 0) { - return; - } - - foreach (var startHandler in _taskStartHandlers) - { - startHandler.OnNext(_taskStart.Value); + foreach (var startHandler in _taskStartHandlers) + { + startHandler.OnNext(_taskStart); + } } } public void Stop() { - if (!_taskStop.IsPresent()) + try { - return; + if (Interlocked.Exchange(ref _stopHasBeenInvoked, 1) == 0) + { + foreach (var stopHandler in _taskStopHandlers) + { + stopHandler.OnNext(_taskStop); + } + } } - - foreach (var stopHandler in _taskStopHandlers) + catch (Exception e) { - stopHandler.OnNext(_taskStop.Value); + throw new TaskStopHandlerException("Encountered Exception on TaskStopHandler.", e); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 6297d2e..8a13d63 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -110,6 +110,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); } } + catch (TaskStopHandlerException e) + { + _currentStatus.SetException(e.InnerException); + } catch (Exception e) { _currentStatus.SetException(e); @@ -156,11 +160,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void Close(byte[] message) { - Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId)); + Logger.Log(Level.Info, "Trying to close Task {0}", TaskId); if (_currentStatus.IsNotRunning()) { - Logger.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State)); + Logger.Log(Level.Warning, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State); return; } try @@ -171,8 +175,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task catch (Exception e) { Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", Logger); - _currentStatus.SetException(TaskClientCodeException.Create( - TaskId, ContextId, "Error during Close().", e)); + _currentStatus.SetException(e); } } http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs index 43eb55e..0347033 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs @@ -100,16 +100,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { try { - if (HasEnded()) - { - // Note that this is possible if the job is already DONE, but a - // Task Close is triggered prior to the DONE signal propagates to the - // Driver. If the Task Close handler is not implemented, the Handler will - // mark the Task with an Exception, although for all intents and purposes - // the Task is already done and should not be affected. - return; - } - if (!_lastException.IsPresent()) { _lastException = Optional<Exception>.Of(e); @@ -130,6 +120,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task lock (_heartBeatManager) { _result = Optional<byte[]>.OfNullable(result); + _taskLifeCycle.Stop(); + switch (State) { case TaskState.SuspendRequested: @@ -140,7 +132,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task State = TaskState.Done; break; } - _taskLifeCycle.Stop(); Heartbeat(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopHandlerException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopHandlerException.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopHandlerException.cs new file mode 100644 index 0000000..f86605d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopHandlerException.cs @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task +{ + /// <summary> + /// An Exception that indicates that the TaskStopHandlers have triggered an Exception. + /// </summary> + internal sealed class TaskStopHandlerException : Exception + { + internal TaskStopHandlerException(string message, Exception inner) : base(message, inner) + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs index bfc91c1..f9a16dd 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs @@ -106,8 +106,8 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge { case ContextOneId: var contextConfig = - Common.Context.ContextConfiguration.ConfigurationModule.Set( - Common.Context.ContextConfiguration.Identifier, ContextTwoId) + REEF.Common.Context.ContextConfiguration.ConfigurationModule.Set( + REEF.Common.Context.ContextConfiguration.Identifier, ContextTwoId) .Build(); var stackingContextConfig = TangFactory.GetTang() @@ -193,9 +193,9 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge { case ContextOneId: var contextConfig = - Common.Context.ContextConfiguration.ConfigurationModule - .Set(Common.Context.ContextConfiguration.Identifier, ContextTwoId) - .Set(Common.Context.ContextConfiguration.OnContextStart, GenericType<TestContextStackContextStartHandler>.Class) + REEF.Common.Context.ContextConfiguration.ConfigurationModule + .Set(REEF.Common.Context.ContextConfiguration.Identifier, ContextTwoId) + .Set(REEF.Common.Context.ContextConfiguration.OnContextStart, GenericType<TestContextStackContextStartHandler>.Class) .Build(); var stackingContextConfig = @@ -287,8 +287,8 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge public void OnNext(IAllocatedEvaluator value) { - value.SubmitContext(Common.Context.ContextConfiguration.ConfigurationModule - .Set(Common.Context.ContextConfiguration.Identifier, ContextOneId) + value.SubmitContext(REEF.Common.Context.ContextConfiguration.ConfigurationModule + .Set(REEF.Common.Context.ContextConfiguration.Identifier, ContextOneId) .Build()); } http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs new file mode 100644 index 0000000..6e66dac --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; + +namespace Org.Apache.REEF.Tests.Functional.Common.Task.Handlers +{ + /// <summary> + /// A helper test class that implements <see cref="IObserver{T}"/>, which throws an + /// Exception after executing an optional Action provided by the caller of the constructor. + /// </summary> + internal abstract class ExceptionThrowingHandler<T> : IObserver<T> + { + private readonly Exception _exceptionToThrow; + private readonly Action<T> _action; + + protected ExceptionThrowingHandler( + Exception exceptionToThrow, Action<T> action = null) + { + _exceptionToThrow = exceptionToThrow; + _action = action; + } + + public void OnNext(T value) + { + if (_action != null) + { + _action(value); + } + + throw _exceptionToThrow; + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/LoggingHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/LoggingHandler.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/LoggingHandler.cs new file mode 100644 index 0000000..4133310 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/LoggingHandler.cs @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Common.Task.Handlers +{ + /// <summary> + /// A helper test class that implements <see cref="IObserver{T}"/>, which logs + /// a message provided by the caller of the constructor. + /// </summary> + public abstract class LoggingHandler<T> : IObserver<T> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(LoggingHandler<>)); + + private readonly string _messageToLog; + + protected LoggingHandler(string messageToLog) + { + _messageToLog = messageToLog; + } + + public void OnNext(T value) + { + Logger.Log(Level.Info, _messageToLog); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/LoggingTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/LoggingTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/LoggingTask.cs new file mode 100644 index 0000000..04ba961 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/LoggingTask.cs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Common.Task +{ + /// <summary> + /// A helper test class that implements <see cref="ITask"/>, which logs + /// a message provided by the caller of the constructor. + /// </summary> + public abstract class LoggingTask : ITask + { + private static readonly Logger Logger = Logger.GetLogger(typeof(LoggingTask)); + + private readonly string _messageToLog; + + protected LoggingTask(string messageToLog) + { + _messageToLog = messageToLog; + } + + public void Dispose() + { + } + + public byte[] Call(byte[] memento) + { + Logger.Log(Level.Info, _messageToLog); + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs new file mode 100644 index 0000000..84aba1f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Runtime.Serialization; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Tests.Functional.Common.Task; +using Org.Apache.REEF.Tests.Functional.Common.Task.Handlers; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Failure.User +{ + /// <summary> + /// This test class contains a test that validates that an Exception in the + /// TaskStopHandler causes a FailedTask event in the Driver. + /// </summary> + [Collection("FunctionalTests")] + public sealed class TaskStopExceptionTest : ReefFunctionalTest + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TaskStopExceptionTest)); + + private const string TaskStopExceptionMessage = "TaskStopExceptionMessage"; + private const string InitialTaskMessage = "InitialTaskMessage"; + private const string ResubmitTaskMessage = "ResubmitTaskMessage"; + private const string FailedTaskReceived = "FailedTaskReceived"; + private const string CompletedTaskReceived = "CompletedTaskReceived"; + + /// <summary> + /// This test validates that an Exception in the TaskStopHandler causes a FailedTask + /// event in the Driver, and that a new Task can be submitted on the original Context. + /// </summary> + [Fact] + public void TestStopTaskWithExceptionOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<TaskStopExceptionTestDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<TaskStopExceptionTestDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<TaskStopExceptionTestDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<TaskStopExceptionTestDriver>.Class) + .Build(), typeof(TaskStopExceptionTestDriver), 1, "testStopTaskWithExceptionOnLocalRuntime", "local", testFolder); + + var driverMessages = new List<string> + { + FailedTaskReceived, + CompletedTaskReceived + }; + + ValidateMessagesSuccessfullyLoggedForDriver(driverMessages, testFolder, 1); + ValidateMessageSuccessfullyLogged(driverMessages, "driver", DriverStdout, testFolder, 1); + + var evaluatorMessages = new List<string> { InitialTaskMessage, ResubmitTaskMessage }; + ValidateMessageSuccessfullyLogged(evaluatorMessages, "Node-*", EvaluatorStdout, testFolder, 1); + CleanUp(testFolder); + } + + private sealed class TaskStopExceptionTestDriver : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<ICompletedTask>, + IObserver<IFailedTask> + { + private readonly IEvaluatorRequestor _requestor; + + [Inject] + private TaskStopExceptionTestDriver(IEvaluatorRequestor requestor) + { + _requestor = requestor; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + // submit the first Task. + value.SubmitTask(TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "TaskID") + .Set(TaskConfiguration.Task, GenericType<TaskStopExceptionTask>.Class) + .Set(TaskConfiguration.OnTaskStop, GenericType<TaskStopHandlerWithException>.Class) + .Build()); + } + + public void OnNext(ICompletedTask value) + { + // Should only receive one CompletedTask, as validated. + Logger.Log(Level.Info, CompletedTaskReceived); + value.ActiveContext.Dispose(); + } + + public void OnNext(IFailedTask value) + { + // Check that Exceptions are deserialized correctly. + var ex = value.AsError(); + if (ex == null) + { + throw new Exception("Exception was not expected to be null."); + } + + var taskStopEx = ex as TaskStopExceptionTestException; + + if (taskStopEx == null) + { + throw new Exception("Expected Exception to be of type TaskStopExceptionTestException, but instead got type " + ex.GetType().Name); + } + + if (taskStopEx.Message != TaskStopExceptionMessage) + { + throw new Exception( + "Expected message to be " + TaskStopExceptionMessage + " but instead got " + taskStopEx.Message + "."); + } + + Logger.Log(Level.Info, FailedTaskReceived); + + // Submit the new Task to verify that the original Context accepts new Tasks. + value.GetActiveContext().Value.SubmitTask( + TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "TaskID") + .Set(TaskConfiguration.Task, GenericType<TaskStopExceptionResubmitTask>.Class) + .Build()); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + private sealed class TaskStopExceptionTask : LoggingTask + { + [Inject] + private TaskStopExceptionTask() + : base(InitialTaskMessage) + { + } + } + + private sealed class TaskStopExceptionResubmitTask : LoggingTask + { + [Inject] + private TaskStopExceptionResubmitTask() + : base(ResubmitTaskMessage) + { + } + } + + private sealed class TaskStopHandlerWithException : ExceptionThrowingHandler<ITaskStop> + { + [Inject] + private TaskStopHandlerWithException() : + base(new TaskStopExceptionTestException(TaskStopExceptionMessage)) + { + } + } + + /// <summary> + /// A Serializable Exception to verify that the Exception is deserialized correctly. + /// </summary> + [Serializable] + private sealed class TaskStopExceptionTestException : Exception + { + public TaskStopExceptionTestException(string message) : base(message) + { + } + + private TaskStopExceptionTestException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs index d71f20d..149cc9b 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs @@ -196,6 +196,19 @@ namespace Org.Apache.REEF.Tests.Functional } /// <summary> + /// See <see cref="ValidateMessageSuccessfullyLogged"/> for detail. This function is <see cref="ValidateMessageSuccessfullyLogged"/> + /// for the driver log. + /// </summary> + protected void ValidateMessagesSuccessfullyLoggedForDriver( + IEnumerable<string> messages, + string testFolder, + int numberOfOccurrences = 1) + { + var msgs = new List<string>(messages); + ValidateMessageSuccessfullyLogged(msgs, "driver", DriverStdout, testFolder, numberOfOccurrences); + } + + /// <summary> /// Validates that each of the message provided in the <see cref="messages"/> parameter occurs /// some number of times. /// If <see cref="numberOfOccurrences"/> is greater than or equal to 0, validates that each of the message in @@ -203,7 +216,8 @@ namespace Org.Apache.REEF.Tests.Functional /// If <see cref="numberOfOccurrences"/> is less than 0, validates that each of the message in <see cref="messages"/> /// occur at least once. /// </summary> - protected void ValidateMessageSuccessfullyLogged(IList<string> messages, string subfolder, string fileName, string testFolder, int numberOfOccurrences = 1) + protected void ValidateMessageSuccessfullyLogged( + IEnumerable<string> messages, string subfolder, string fileName, string testFolder, int numberOfOccurrences = 1) { string[] lines = ReadLogFile(fileName, subfolder, testFolder); foreach (string message in messages) http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 1c6949a..0746e12 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -86,6 +86,9 @@ under the License. <Compile Include="Functional\Bridge\TestSimpleEventHandlers.cs" /> <Compile Include="Functional\Bridge\TestSuspendTask.cs" /> <Compile Include="Functional\Bridge\TestUnhandledTaskException.cs" /> + <Compile Include="Functional\Common\Task\Handlers\LoggingHandler.cs" /> + <Compile Include="Functional\Common\Task\LoggingTask.cs" /> + <Compile Include="Functional\Common\Task\Handlers\ExceptionThrowingHandler.cs" /> <Compile Include="Functional\Driver\DriverTestStartHandler.cs" /> <Compile Include="Functional\Failure\BasePoisonedEvaluatorWithActiveContextDriver.cs" /> <Compile Include="Functional\Failure\BasePoisonedEvaluatorWithRunningTaskDriver.cs" /> @@ -97,6 +100,7 @@ under the License. <Compile Include="Functional\Failure\TestEvaluatorWithRunningTaskImmediatePoison.cs" /> <Compile Include="Functional\Failure\SleepTask.cs" /> <Compile Include="Functional\Failure\User\TaskConstructorExceptionTest.cs" /> + <Compile Include="Functional\Failure\User\TaskStopExceptionTest.cs" /> <Compile Include="Functional\FaultTolerant\TestContextStart.cs" /> <Compile Include="Functional\FaultTolerant\TestResubmitEvaluator.cs" /> <Compile Include="Functional\FaultTolerant\TestResubmitTask.cs" />
