Repository: reef Updated Branches: refs/heads/master 641cb59c0 -> 4ab6a8d1b
[REEF-1447] Validate Task close failure => FailedTask Event (task has not yet finished) This addressed the issue by * Failing the Evaluator on an `Exception` in `TaskCloseHandler`. * Adding a test to test for close event failure while Task is still running. Validating that the `TaskCloseHandler` `Exception` triggers a `FailedEvaluator`. * Modify `TaskRuntime` and related tests to verify that the `TaskStopHandler` is not called upon `Task` failure. JIRA: [REEF-1447](https://issues.apache.org/jira/browse/REEF-1447) This closes #1042 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4ab6a8d1 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4ab6a8d1 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4ab6a8d1 Branch: refs/heads/master Commit: 4ab6a8d1beb4016848174bbecf82782e7145a35f Parents: 641cb59 Author: Andrew Chung <[email protected]> Authored: Thu Jun 9 16:59:36 2016 -0700 Committer: Julia Wang <[email protected]> Committed: Fri Jul 1 14:24:36 2016 -0700 ---------------------------------------------------------------------- .../Runtime/Evaluator/Task/TaskRuntime.cs | 11 +- .../Runtime/Evaluator/Task/TaskStatus.cs | 26 +-- .../TaskRuntimeTests.cs | 7 +- .../Functional/Bridge/TestCloseTask.cs | 103 ++--------- .../Functional/Bridge/TestDisposeTasks.cs | 26 ++- .../Functional/Common/EventHandle.cs | 59 +++++++ .../Task/Handlers/ExceptionThrowingHandler.cs | 19 +- .../Failure/User/TaskCloseExceptionTest.cs | 173 +++++++++++++++++++ .../FaultTolerant/TestResubmitTask.cs | 26 ++- .../Org.Apache.REEF.Tests.csproj | 3 +- 10 files changed, 302 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index a6480e8..2feccd2 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -41,6 +41,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task private readonly IInjectionFuture<IObserver<ISuspendEvent>> _suspendHandlerFuture; private readonly IInjectionFuture<IObserver<ICloseEvent>> _closeHandlerFuture; private int _taskRan = 0; + private int _taskClosed = 0; [Inject] private TaskRuntime( @@ -165,6 +166,13 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void Close(byte[] message) { Logger.Log(Level.Info, "Trying to close Task {0}", TaskId); + if (Interlocked.Exchange(ref _taskClosed, 1) != 0) + { + // Return if we have already called close. This can happen when TaskCloseHandler + // is invoked and throws an Exception before the Task is completed. The control flows + // to failing the Evaluator, which eventually tries to close the Task again on Dispose. + return; + } if (_currentStatus.IsNotRunning()) { @@ -178,8 +186,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } catch (Exception e) { - Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", Logger); - _currentStatus.SetException(e); + Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Error during Close.", Logger); } finally { http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs index 5e4ca2f..a195333 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs @@ -98,20 +98,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { lock (_heartBeatManager) { - try - { - if (!_lastException.IsPresent()) - { - _lastException = Optional<Exception>.Of(e); - } - - State = TaskState.Failed; - _taskLifeCycle.Stop(); - } - finally - { - Heartbeat(); - } + _lastException = Optional<Exception>.Of(e); + State = TaskState.Failed; + Heartbeat(); } } @@ -120,6 +109,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task lock (_heartBeatManager) { _result = Optional<byte[]>.OfNullable(result); + + // This can throw an Exception. _taskLifeCycle.Stop(); switch (State) @@ -132,6 +123,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task State = TaskState.Done; break; } + Heartbeat(); } } @@ -354,9 +346,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { if (_result.IsPresent() && _lastException.IsPresent()) { - LOGGER.Log(Level.Warning, "Both task result and exception are present, the expcetion will take over. Thrown away result:" + ByteUtilities.ByteArraysToString(_result.Value)); - State = TaskState.Failed; - _result = Optional<byte[]>.Empty(); + throw new ApplicationException( + string.Format("Both Exception and Result are present. One of the Threads have already sent a result back." + + "Result returned [{0}]. Exception was [{1}]. Failing the Evaluator.", _result.Value, _lastException.Value)); } } http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs index 5e673e5..c8078b0 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs @@ -206,7 +206,7 @@ namespace Org.Apache.REEF.Evaluator.Tests /// <summary> /// Tests whether task start and stop handlers are properly instantiated and invoked - /// on the failure of a task. + /// on the failure of a task. On failure, TaskStop handler should not be invoked. /// </summary> [Fact] public void TestTaskEventsOnFailure() @@ -232,8 +232,7 @@ namespace Org.Apache.REEF.Evaluator.Tests throw new Exception("Event handler is not expected to be null."); } - Assert.True(testTaskEventStopHandler.StopInvoked.IsPresent()); - Assert.Equal(testTaskEventStopHandler.StopInvoked.Value, taskId); + Assert.False(testTaskEventStopHandler.StopInvoked.IsPresent()); taskThread.Join(); } @@ -337,7 +336,7 @@ namespace Org.Apache.REEF.Evaluator.Tests throw new Exception("Event handler is not expected to be null."); } - Assert.True(testTaskEventStopHandler.StopInvoked.IsPresent()); + Assert.False(testTaskEventStopHandler.StopInvoked.IsPresent()); Assert.Equal(taskRuntime.GetTaskState(), TaskState.Failed); taskRuntime.Suspend(null); http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs index a4b28cc..d50489c 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs @@ -98,7 +98,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge { string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForEnforceToCloseTask()), typeof(CloseTaskTestDriver), 1, "TestEnforceCloseTask", "local", testFolder); - ValidateSuccessForLocalRuntime(1, 1, 0, testFolder); + ValidateSuccessForLocalRuntime(0, 0, 1, testFolder); ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 0); var messages = new List<string>(); messages.Add(DisposeMessageFromDriver); @@ -108,29 +108,6 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge } /// <summary> - /// This test is to close a running task with exception throw in close handler - /// Expect to receive Exception in Failed Task event handler in driver - /// </summary> - [Fact] - public void TestStopTaskWithExceptionOnLocalRuntime() - { - const string successIndication = "EXIT: ActiveContextClr2Java::Close"; - const string failedTaskIndication = "Java_org_apache_reef_javabridge_NativeInterop_clrSystemFailedTaskHandlerOnNext"; - - string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); - TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForFailToCloseTask()), typeof(CloseTaskTestDriver), 1, "testStopTaskWithException", "local", testFolder); - var messages = new List<string>(); - messages.Add(successIndication); - messages.Add(failedTaskIndication); - ValidateMessageSuccessfullyLogged(messages, "driver", DriverStdout, testFolder, 1); - - var messages1 = new List<string>(); - messages1.Add(DisposeMessageFromDriver); - ValidateMessageSuccessfullyLogged(messages1, "Node-*", EvaluatorStdout, testFolder, 2); - CleanUp(testFolder); - } - - /// <summary> /// This test is to close a running task over the bridge without close handler bound /// Expect to get TaskCloseHandlerNotBoundException /// </summary> @@ -143,7 +120,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForNoCloseHandlerTask()), typeof(CloseTaskTestDriver), 1, "testStopTaskWithNoCloseHandler", "local", testFolder); var messages = new List<string>(); messages.Add(closeHandlerNoBound); - ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1); + ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, -1); CleanUp(testFolder); } @@ -195,15 +172,6 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge .Build(); } - private IConfiguration GetTaskConfigurationForFailToCloseTask() - { - return TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, "TaskID-FailToClose") - .Set(TaskConfiguration.Task, GenericType<TestCloseTask.CloseByThrowExceptionTask>.Class) - .Set(TaskConfiguration.OnClose, GenericType<TestCloseTask.CloseByThrowExceptionTask>.Class) - .Build(); - } - private IConfiguration GetTaskConfigurationForNoCloseHandlerTask() { return TaskConfiguration.ConfigurationModule @@ -225,6 +193,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge .Set(DriverConfiguration.OnTaskRunning, GenericType<CloseTaskTestDriver>.Class) .Set(DriverConfiguration.OnTaskCompleted, GenericType<CloseTaskTestDriver>.Class) .Set(DriverConfiguration.OnTaskFailed, GenericType<CloseTaskTestDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<CloseTaskTestDriver>.Class) .Build(); AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); @@ -252,6 +221,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge IObserver<IActiveContext>, IObserver<ICompletedTask>, IObserver<IFailedTask>, + IObserver<IFailedEvaluator>, IObserver<IRunningTask> { private readonly IEvaluatorRequestor _requestor; @@ -295,6 +265,13 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge value.ActiveContext.Dispose(); } + public void OnNext(IFailedEvaluator value) + { + Assert.True(value.FailedTask.IsPresent()); + Assert.Equal(value.FailedTask.Value.Id, "TaskID-EnforceToClose"); + Assert.Contains(TaskManager.TaskKilledByDriver, value.EvaluatorException.InnerException.Message); + } + public void OnNext(IFailedTask value) { var failedExeption = ByteUtilities.ByteArraysToString(value.Data.Value); @@ -308,11 +285,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge { Assert.Contains(DefaultTaskCloseHandler.ExceptionMessage, failedExeption); } - if (value.Id.EndsWith("TaskID-EnforceToClose")) - { - Assert.Contains(TaskManager.TaskKilledByDriver, failedExeption); - } - + value.GetActiveContext().Value.Dispose(); } @@ -499,58 +472,6 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge } /// <summary> - /// This is a test task for the scenario in which the task receives close event, instead of - /// let the task to return properly, it throws exception. - /// </summary> - private sealed class CloseByThrowExceptionTask : ITask, IObserver<ICloseEvent> - { - private readonly CountdownEvent _suspendSignal = new CountdownEvent(1); - - [Inject] - private CloseByThrowExceptionTask() - { - } - - public byte[] Call(byte[] memento) - { - Logger.Log(Level.Info, "Hello in FailtToCloseTask"); - _suspendSignal.Wait(); - return null; - } - - public void Dispose() - { - Logger.Log(Level.Info, "Task is disposed."); - } - - public void OnNext(ICloseEvent value) - { - try - { - if (value.Value != null && value.Value.Value != null) - { - Logger.Log(Level.Info, "Closed event received in task:" + Encoding.UTF8.GetString(value.Value.Value)); - Assert.Equal(Encoding.UTF8.GetString(value.Value.Value), DisposeMessageFromDriver); - } - } - finally - { - throw new Exception(FailToCloseTaskMessage); - } - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - } - - /// <summary> /// This task doesn't implement close handler. It is to test closeHandlerNoBound exception. /// </summary> private sealed class MissingCloseHandlerTask : ITask http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs index 7e8702c..f7ebd2c 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs @@ -20,7 +20,6 @@ using System.Collections.Generic; using System.Text; using System.Threading; using Org.Apache.REEF.Common.Context; -using Org.Apache.REEF.Common.Runtime.Evaluator.Task; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.Driver; @@ -32,12 +31,13 @@ using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Tests.Functional.Bridge.Exceptions; using Org.Apache.REEF.Utilities.Logging; using Xunit; namespace Org.Apache.REEF.Tests.Functional.Bridge { + [Collection("FunctionalTests")] public class TestDisposeTasks : ReefFunctionalTest { private static readonly Logger Logger = Logger.GetLogger(typeof(TestDisposeTasks)); @@ -71,7 +71,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge { string testFolder = DefaultRuntimeFolder + TestId; TestRun(DriverConfigurations(2), typeof(TestDisposeTasks), 1, "TestDisposeTasks", "local", testFolder); - ValidateSuccessForLocalRuntime(1, 1, 0, testFolder); + ValidateSuccessForLocalRuntime(0, 0, 1, testFolder); var messages = new List<string> { TaskIsDisposed }; ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1); CleanUp(testFolder); @@ -109,7 +109,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge .Set(DriverConfiguration.OnContextActive, GenericType<DisposeTaskTestDriver>.Class) .Set(DriverConfiguration.OnTaskRunning, GenericType<DisposeTaskTestDriver>.Class) .Set(DriverConfiguration.OnTaskCompleted, GenericType<DisposeTaskTestDriver>.Class) - .Set(DriverConfiguration.OnTaskFailed, GenericType<DisposeTaskTestDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<DisposeTaskTestDriver>.Class) .Build(); return Configurations.Merge(taskIdConfig, driverConfig); @@ -123,7 +123,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<ICompletedTask>, - IObserver<IFailedTask>, + IObserver<IFailedEvaluator>, IObserver<IRunningTask> { private readonly IEvaluatorRequestor _requestor; @@ -167,20 +167,16 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge /// And verify the context associated with the failed task is the same as the context that the task was submitted /// </summary> /// <param name="value"></param> - public void OnNext(IFailedTask value) + public void OnNext(IFailedEvaluator value) { - Assert.Equal(TaskId + "2", value.Id); + Assert.True(value.FailedTask.IsPresent()); + Assert.Equal(TaskId + "2", value.FailedTask.Value.Id); - var failedException = ByteUtilities.ByteArraysToString(value.Data.Value); - var e = value.AsError(); + var e = value.EvaluatorException.InnerException; Logger.Log(Level.Error, "In IFailedTask: e.type: {0}, e.message: {1}.", e.GetType(), e.Message); - Logger.Log(Level.Error, "In IFailedTask: value.Data.Value: {0}, value.Message {1}.", failedException, value.Message); - Assert.Equal(typeof(Exception), e.GetType()); + Assert.Equal(typeof(TestSerializableException), e.GetType()); Assert.Equal(TaskKilledByDriver, e.Message); - Assert.Contains(TaskKilledByDriver, failedException); - - value.GetActiveContext().Value.Dispose(); } /// <summary> @@ -272,7 +268,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge } else if (msg.Equals(ExitByException)) { - throw new Exception(TaskKilledByDriver); + throw new TestSerializableException(TaskKilledByDriver); } } else http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Functional/Common/EventHandle.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/EventHandle.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/EventHandle.cs new file mode 100644 index 0000000..b30302a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/EventHandle.cs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Threading; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Tests.Functional.Common +{ + /// <summary> + /// An test EventHandle that simply wraps around a <see cref="ManualResetEventSlim"/>. + /// </summary> + public sealed class EventHandle + { + private readonly ManualResetEventSlim _eventHandle = new ManualResetEventSlim(); + + [Inject] + private EventHandle() + { + } + + /// <summary> + /// Sets the Event. + /// </summary> + public void Signal() + { + _eventHandle.Set(); + } + + /// <summary> + /// Waits for the event signal. + /// </summary> + public void Wait() + { + _eventHandle.Wait(); + } + + /// <summary> + /// Resets the event signal. + /// </summary> + public void Reset() + { + _eventHandle.Reset(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs index 6e66dac..b24f26e 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs @@ -26,23 +26,28 @@ namespace Org.Apache.REEF.Tests.Functional.Common.Task.Handlers internal abstract class ExceptionThrowingHandler<T> : IObserver<T> { private readonly Exception _exceptionToThrow; - private readonly Action<T> _action; + private readonly Action<T> _finallyAction; protected ExceptionThrowingHandler( - Exception exceptionToThrow, Action<T> action = null) + Exception exceptionToThrow, Action<T> finallyAction = null) { _exceptionToThrow = exceptionToThrow; - _action = action; + _finallyAction = finallyAction; } public void OnNext(T value) { - if (_action != null) + try { - _action(value); + throw _exceptionToThrow; + } + finally + { + if (_finallyAction != null) + { + _finallyAction(value); + } } - - throw _exceptionToThrow; } public void OnError(Exception error) http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskCloseExceptionTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskCloseExceptionTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskCloseExceptionTest.cs new file mode 100644 index 0000000..8571531 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskCloseExceptionTest.cs @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Runtime.Serialization; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Tests.Functional.Bridge.Exceptions; +using Org.Apache.REEF.Tests.Functional.Common; +using Org.Apache.REEF.Tests.Functional.Common.Task; +using Org.Apache.REEF.Tests.Functional.Common.Task.Handlers; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Failure.User +{ + /// <summary> + /// This test class contains a test that validates that an Exception in the + /// TaskCloseHandler causes a FailedTask event in the Driver. + /// </summary> + [Collection("FunctionalTests")] + public sealed class TaskCloseExceptionTest : ReefFunctionalTest + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TaskCloseExceptionTest)); + + private const string TaskCloseExceptionMessage = "TaskCloseExceptionMessage"; + private const string FailedEvaluatorReceived = "FailedEvaluatorReceived"; + + /// <summary> + /// This test validates that an Exception in the TaskCloseHandler causes a FailedTask + /// event in the Driver, and that a new Task can be submitted on the original Context. + /// </summary> + [Fact] + public void TestCloseTaskWithExceptionOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<TaskCloseExceptionTestDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<TaskCloseExceptionTestDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<TaskCloseExceptionTestDriver>.Class) + .Set(DriverConfiguration.OnTaskRunning, GenericType<TaskCloseExceptionTestDriver>.Class) + .Build(), typeof(TaskCloseExceptionTestDriver), 1, "testCloseTaskWithExceptionOnLocalRuntime", "local", testFolder); + + var driverMessages = new List<string> + { + FailedEvaluatorReceived + }; + + ValidateSuccessForLocalRuntime(numberOfContextsToClose: 0, numberOfEvaluatorsToFail: 1, testFolder: testFolder); + ValidateMessagesSuccessfullyLoggedForDriver(driverMessages, testFolder, 1); + ValidateMessageSuccessfullyLogged(driverMessages, "driver", DriverStdout, testFolder, 1); + CleanUp(testFolder); + } + + private sealed class TaskCloseExceptionTestDriver : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IRunningTask>, + IObserver<IFailedEvaluator> + { + private static readonly string TaskId = "TaskId"; + + private readonly IEvaluatorRequestor _requestor; + + [Inject] + private TaskCloseExceptionTestDriver(IEvaluatorRequestor requestor) + { + _requestor = requestor; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + // submit the first Task. + value.SubmitTask(TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, TaskId) + .Set(TaskConfiguration.Task, GenericType<TaskCloseExceptionTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<TaskCloseHandlerWithException>.Class) + .Build()); + } + + public void OnNext(IRunningTask value) + { + if (value.Id == TaskId) + { + value.Dispose(); + } + } + + public void OnNext(IFailedEvaluator value) + { + Assert.True(value.FailedTask.IsPresent()); + var failedTask = value.FailedTask.Value; + + Assert.Equal(TaskId, failedTask.Id); + + // Check that Exceptions are deserialized correctly. + var ex = value.EvaluatorException.InnerException; + if (ex == null) + { + throw new Exception("Exception was not expected to be null."); + } + + var taskCloseEx = ex as TestSerializableException; + + if (taskCloseEx == null) + { + throw new Exception("Expected Exception to be of type TaskCloseExceptionTestException, but instead got type " + ex.GetType().Name); + } + + if (taskCloseEx.Message != TaskCloseExceptionMessage) + { + throw new Exception( + "Expected message to be " + TaskCloseExceptionMessage + " but instead got " + taskCloseEx.Message + "."); + } + + Logger.Log(Level.Info, FailedEvaluatorReceived); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + private sealed class TaskCloseExceptionTask : WaitingTask + { + [Inject] + private TaskCloseExceptionTask(EventMonitor monitor) : base(monitor) + { + } + } + + private sealed class TaskCloseHandlerWithException : ExceptionThrowingHandler<ICloseEvent> + { + [Inject] + private TaskCloseHandlerWithException(EventMonitor monitor) : base( + new TestSerializableException(TaskCloseExceptionMessage), + close => { monitor.Signal(); }) + { + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs index 8e2962f..675dc5d 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Text; using System.Threading; using Org.Apache.REEF.Common.Context; @@ -74,7 +75,7 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant { string testFolder = DefaultRuntimeFolder + TestId; TestRun(DriverConfigurations(), typeof(ResubmitTaskTestDriver), 2, "TestResubimitTask", "local", testFolder); - ValidateSuccessForLocalRuntime(2, 2, 0, testFolder); + ValidateSuccessForLocalRuntime(1, 0, 1, testFolder); CleanUp(testFolder); } @@ -90,7 +91,7 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant .Set(DriverConfiguration.OnContextActive, GenericType<ResubmitTaskTestDriver>.Class) .Set(DriverConfiguration.OnTaskRunning, GenericType<ResubmitTaskTestDriver>.Class) .Set(DriverConfiguration.OnTaskCompleted, GenericType<ResubmitTaskTestDriver>.Class) - .Set(DriverConfiguration.OnTaskFailed, GenericType<ResubmitTaskTestDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<ResubmitTaskTestDriver>.Class) .Build(); } @@ -102,7 +103,7 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<ICompletedTask>, - IObserver<IFailedTask>, + IObserver<IFailedEvaluator>, IObserver<IRunningTask> { private readonly IEvaluatorRequestor _requestor; @@ -154,19 +155,17 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant } /// <summary> - /// Verify when exception is shown in task, IFailedTask will be received here with the message set in the task - /// And verify the context associated with the failed task is the same as the context that the task was submitted + /// Verify when exception is shown in TaskCloseHandler, IFailedEvaluator will be received here with the message set in the task /// </summary> - /// <param name="value"></param> - public void OnNext(IFailedTask value) + public void OnNext(IFailedEvaluator value) { - var failedExeption = ByteUtilities.ByteArraysToString(value.Data.Value); - Logger.Log(Level.Error, "In IFailedTask: " + failedExeption); + Assert.True(value.FailedTask.IsPresent()); + var failedExeption = value.EvaluatorException.InnerException; + Assert.Contains(TaskKilledByDriver, failedExeption.Message); - VerifyContextTaskMapping(value.Id, value.GetActiveContext().Value.Id); - Assert.Contains(TaskKilledByDriver, failedExeption); + Logger.Log(Level.Error, "In IFailedEvaluator: " + failedExeption); - OnNext(value.GetActiveContext().Value); + VerifyContextTaskMapping(value.FailedTask.Value.Id, value.FailedContexts.Single().Id); } private void VerifyContextTaskMapping(string taskId, string contextId) @@ -189,11 +188,10 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant switch (value.Id) { case TaskId + "1": - case TaskId + "2": value.Dispose(Encoding.UTF8.GetBytes(KillTaskCommandFromDriver)); break; + case TaskId + "2": case TaskId + "3": - case TaskId + "4": value.Send(Encoding.UTF8.GetBytes(CompleteTaskCommandFromDriver)); break; default: http://git-wip-us.apache.org/repos/asf/reef/blob/4ab6a8d1/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index aed5d5c..f56b2bb 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -84,7 +84,6 @@ under the License. <Compile Include="Functional\Failure\User\ServiceConstructorExceptionTest.cs" /> <Compile Include="Functional\Failure\User\ReceiveContextMessageExceptionTest.cs" /> <Compile Include="Functional\Failure\User\ContextStartExceptionTest.cs" /> - <Compile Include="Functional\Common\Task\WaitingTask.cs" /> <Compile Include="Functional\Failure\User\ReceiveTaskMessageExceptionTest.cs" /> <Compile Include="Functional\Failure\User\TaskCallExceptionTest.cs" /> <Compile Include="Functional\Bridge\Exceptions\TestNonSerializableException.cs" /> @@ -97,6 +96,7 @@ under the License. <Compile Include="Functional\Common\Task\Handlers\ExceptionThrowingHandler.cs" /> <Compile Include="Functional\Failure\User\TaskStartExceptionTest.cs" /> <Compile Include="Functional\Failure\User\UnhandledThreadExceptionInTaskTest.cs" /> + <Compile Include="Functional\Common\Task\WaitingTask.cs" /> <Compile Include="Functional\Driver\DriverTestStartHandler.cs" /> <Compile Include="Functional\Failure\BasePoisonedEvaluatorWithActiveContextDriver.cs" /> <Compile Include="Functional\Failure\BasePoisonedEvaluatorWithRunningTaskDriver.cs" /> @@ -107,6 +107,7 @@ under the License. <Compile Include="Functional\Failure\TestEvaluatorWithActiveContextImmediatePoison.cs" /> <Compile Include="Functional\Failure\TestEvaluatorWithRunningTaskImmediatePoison.cs" /> <Compile Include="Functional\Failure\SleepTask.cs" /> + <Compile Include="Functional\Failure\User\TaskCloseExceptionTest.cs" /> <Compile Include="Functional\Failure\User\TaskConstructorExceptionTest.cs" /> <Compile Include="Functional\Failure\User\TaskStopExceptionTest.cs" /> <Compile Include="Functional\FaultTolerant\TestContextStart.cs" />
