Repository: reef Updated Branches: refs/heads/master d5a671b85 -> b5647362a
[REEF-1345] Define IMRU task exceptions * Create IMRU task exceptions * Add test cases * Update Task Manager to get exception from IFailedTask * Update Task Manager test cases JIRA: [REEF-1345](https://issues.apache.org/jira/browse/REEF-1345) Pull Request: This closes #1014 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/b5647362 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/b5647362 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/b5647362 Branch: refs/heads/master Commit: b5647362a4d713983719eee2e7b79826b5071370 Parents: d5a671b Author: Julia Wang <[email protected]> Authored: Tue May 24 11:28:43 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed Jun 1 17:34:06 2016 -0700 ---------------------------------------------------------------------- .../TestTaskManager.cs | 16 ++ .../OnREEF/Driver/TaskManager.cs | 32 +-- .../OnREEF/IMRUTasks/IMRUTaskAppException.cs | 54 ++++ .../IMRUTaskGroupCommunicationException.cs | 56 ++++ .../OnREEF/IMRUTasks/IMRUTaskSystemException.cs | 56 ++++ .../Org.Apache.REEF.IMRU.csproj | 3 + .../Properties/AssemblyInfo.cs | 7 + .../Functional/IMRU/TestTaskExceptions.cs | 258 +++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 5 + 9 files changed, 472 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs index 319f541..bdcebc2 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs @@ -22,6 +22,7 @@ using Org.Apache.REEF.Driver.Evaluator; using Org.Apache.REEF.Driver.Task; using Org.Apache.REEF.IMRU.OnREEF.Driver; using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine; +using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities; @@ -623,9 +624,24 @@ namespace Org.Apache.REEF.IMRU.Tests /// <returns></returns> private static IFailedTask CreateMockFailedTask(string taskId, string errorMsg) { + Exception taskException; + switch (errorMsg) + { + case TaskManager.TaskAppError: + taskException = new IMRUTaskAppException(errorMsg); + break; + case TaskManager.TaskGroupCommunicationError: + taskException = new IMRUTaskGroupCommunicationException(errorMsg); + break; + default: + taskException = new IMRUTaskSystemException(errorMsg); + break; + } + IFailedTask failedtask = Substitute.For<IFailedTask>(); failedtask.Id.Returns(taskId); failedtask.Message.Returns(errorMsg); + failedtask.AsError().Returns(taskException); return failedtask; } http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs index 78af207..584809b 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs @@ -23,6 +23,7 @@ using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; using Org.Apache.REEF.Driver.Task; using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine; +using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Diagnostics; @@ -220,7 +221,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver { //// Remove the task from running tasks if it exists there _runningTasks.Remove(failedTask.Id); - UpdateState(failedTask.Id, GetTaskErrorEvent(failedTask)); + UpdateState(failedTask.Id, GetTaskErrorEventByExceptionType(failedTask)); } /// <summary> @@ -239,7 +240,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } else if (taskState != StateMachine.TaskState.TaskFailedByEvaluatorFailure) { - UpdateState(failedTask.Id, GetTaskErrorEvent(failedTask)); + UpdateState(failedTask.Id, GetTaskErrorEventByExceptionType(failedTask)); } } @@ -354,24 +355,25 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> - /// Gets error type based on the information in IFailedTask - /// Currently we use the Message in IFailedTask to distinguish different types of errors + /// Gets error type based on the exception type in IFailedTask /// </summary> /// <param name="failedTask"></param> /// <returns></returns> - private TaskStateEvent GetTaskErrorEvent(IFailedTask failedTask) + private TaskStateEvent GetTaskErrorEventByExceptionType(IFailedTask failedTask) { - switch (failedTask.Message) + var exception = failedTask.AsError(); + if (exception is IMRUTaskAppException) { - case TaskAppError: - _numberOfAppErrors++; - return TaskStateEvent.FailedTaskAppError; - case TaskSystemError: - return TaskStateEvent.FailedTaskSystemError; - case TaskGroupCommunicationError: - return TaskStateEvent.FailedTaskCommunicationError; - default: - return TaskStateEvent.FailedTaskSystemError; + _numberOfAppErrors++; + return TaskStateEvent.FailedTaskAppError; + } + if (exception is IMRUTaskGroupCommunicationException) + { + return TaskStateEvent.FailedTaskCommunicationError; + } + else + { + return TaskStateEvent.FailedTaskSystemError; } } http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskAppException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskAppException.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskAppException.cs new file mode 100644 index 0000000..bc0893c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskAppException.cs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Runtime.Serialization; + +namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks +{ + /// <summary> + /// A serializable exception that represents a task application error. + /// </summary> + [Serializable] + public sealed class IMRUTaskAppException : Exception + { + /// <summary> + /// Constructor. A serializable exception object that represents a task application error. + /// All the application related error should be captured in this type of exception. + /// When driver receives this type of exception, the system is not going to recover. + /// </summary> + public IMRUTaskAppException(string message) + : base(message) + { + } + + /// <summary> + /// Constructor. A serializable exception object that represents a task application error and wraps an inner exception + /// </summary> + /// <param name="message"></param> + /// <param name="innerException"></param> + public IMRUTaskAppException(string message, Exception innerException) + : base(message, innerException) + { + } + + public IMRUTaskAppException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskGroupCommunicationException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskGroupCommunicationException.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskGroupCommunicationException.cs new file mode 100644 index 0000000..7352056 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskGroupCommunicationException.cs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Runtime.Serialization; + +namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks +{ + /// <summary> + /// A serializable exception that represents a task group communication error. + /// </summary> + [Serializable] + internal sealed class IMRUTaskGroupCommunicationException : Exception + { + /// <summary> + /// Constructor. A serializable exception object that represents a task group communication error. + /// All the errors caused by group communication should be wrapped in this type of exception. For example, + /// when one of the nodes in the group fails, other nodes are not able to receive messages + /// therefore fail. We should use this exception type to represent this error. + /// When driver receives this type of exception, it is recoverable. + /// </summary> + public IMRUTaskGroupCommunicationException(string message) + : base(message) + { + } + + /// <summary> + /// Constructor. A serializable exception object that represents a task group communication error and wraps an inner exception + /// </summary> + /// <param name="message"></param> + /// <param name="innerException"></param> + public IMRUTaskGroupCommunicationException(string message, Exception innerException) + : base(message, innerException) + { + } + + public IMRUTaskGroupCommunicationException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskSystemException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskSystemException.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskSystemException.cs new file mode 100644 index 0000000..198146a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskSystemException.cs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Runtime.Serialization; + +namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks +{ + /// <summary> + /// A serialiable exception that represents a task system error. + /// </summary> + [Serializable] + internal sealed class IMRUTaskSystemException : Exception + { + /// <summary> + /// Constructor. A serializable exception object that represents a task system error. + /// This exception can be thrown when an exception happens in IMRU task and we would like + /// driver to know this type of exception is thrown from IMRU system. + /// For example, if when task is enforced to close by driver, this exception type can be used. + /// When driver receives this type of exception, it is recoverable. + /// </summary> + public IMRUTaskSystemException(string message) + : base(message) + { + } + + /// <summary> + /// Constructor. A serializable exception object that represents a task system error and wraps an inner exception. + /// </summary> + /// <param name="message"></param> + /// <param name="innerException"></param> + public IMRUTaskSystemException(string message, Exception innerException) + : base(message, innerException) + { + } + + public IMRUTaskSystemException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index b51638a..da19271 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -90,6 +90,9 @@ under the License. <Compile Include="OnREEF\Driver\StateMachine\TaskStateTransitionException.cs" /> <Compile Include="OnREEF\Driver\TaskInfo.cs" /> <Compile Include="OnREEF\Driver\TaskManager.cs" /> + <Compile Include="OnREEF\IMRUTasks\IMRUTaskAppException.cs" /> + <Compile Include="OnREEF\IMRUTasks\IMRUTaskGroupCommunicationException.cs" /> + <Compile Include="OnREEF\IMRUTasks\IMRUTaskSystemException.cs" /> <Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" /> <Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" /> <Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs index e426032..50b70e1 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs @@ -38,3 +38,10 @@ using System.Runtime.InteropServices; "9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc" + "b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" + "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")] + +// Allow the tests project access to `internal` APIs +[assembly: InternalsVisibleTo("Org.Apache.REEF.Tests, publickey=" + + "00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9" + + "9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc" + + "b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" + + "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")] http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestTaskExceptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestTaskExceptions.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestTaskExceptions.cs new file mode 100644 index 0000000..08816cb --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestTaskExceptions.cs @@ -0,0 +1,258 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Globalization; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.IMRU.OnREEF.Driver; +using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.IMRU +{ + /// <summary> + /// This is to test task exceptions for IMRU tasks + /// </summary> + [Collection("FunctionalTests")] + public class TestTaskExceptions : ReefFunctionalTest + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TestTaskExceptions)); + private const string TaskId = "taskId"; + private const string ValidFailedTaskMessage = "ValidFailedTaskMessage"; + private const string TaskCompletedMessage = "TaskCompletedMessage"; + private const string InnerExceptionMessage = "InnerExceptionMessage"; + + /// <summary> + /// Test IMRUTaskAppException + /// </summary> + [Fact] + public void TestTaskAppException() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(DriverConfigurations(GetTaskConfiguration(TaskId + 1, TaskManager.TaskAppError)), typeof(FailedTaskDriver), 1, "TestTaskExceptions", "local", testFolder); + ValidateSuccessForLocalRuntime(1, 1, 0, testFolder); + ValidateMessageSuccessfullyLoggedForDriver(ValidFailedTaskMessage, testFolder, 1); + ValidateMessageSuccessfullyLoggedForDriver(TaskCompletedMessage, testFolder, 0); + CleanUp(testFolder); + } + + /// <summary> + /// Test IMRU TestTaskGroupCommunicationException + /// </summary> + [Fact] + public void TestTaskGroupCommunicationException() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(DriverConfigurations(GetTaskConfiguration(TaskId + 2, TaskManager.TaskGroupCommunicationError)), typeof(FailedTaskDriver), 1, "TestTaskExceptions", "local", testFolder); + ValidateSuccessForLocalRuntime(1, 1, 0, testFolder); + ValidateMessageSuccessfullyLoggedForDriver(ValidFailedTaskMessage, testFolder, 1); + ValidateMessageSuccessfullyLoggedForDriver(TaskCompletedMessage, testFolder, 0); + CleanUp(testFolder); + } + + /// <summary> + /// Test IMRUTaskSystemException + /// </summary> + [Fact] + public void TestTaskSystemException() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(DriverConfigurations(GetTaskConfiguration(TaskId + 3, TaskManager.TaskSystemError)), typeof(FailedTaskDriver), 1, "TestTaskExceptions", "local", testFolder); + ValidateSuccessForLocalRuntime(1, 1, 0, testFolder); + ValidateMessageSuccessfullyLoggedForDriver(ValidFailedTaskMessage, testFolder, 1); + ValidateMessageSuccessfullyLoggedForDriver(TaskCompletedMessage, testFolder, 0); + CleanUp(testFolder); + } + + private IConfiguration DriverConfigurations(IConfiguration taskConfig) + { + var driverConfig = DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<FailedTaskDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<FailedTaskDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<FailedTaskDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<FailedTaskDriver>.Class) + .Build(); + + AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); + return TangFactory.GetTang().NewConfigurationBuilder(driverConfig) + .BindStringNamedParam<TaskConfigurationString>(serializer.ToString(taskConfig)) + .Build(); + } + + private IConfiguration GetTaskConfiguration(string taskId, string message) + { + var taskConfig = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, taskId) + .Set(TaskConfiguration.Task, GenericType<ExceptionTask>.Class) + .Build(); + + var additionalConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindNamedParameter<TaskExceptionMessage, string>( + GenericType<TaskExceptionMessage>.Class, message) + .Build(); + + return Configurations.Merge(additionalConfig, taskConfig); + } + + private sealed class FailedTaskDriver : IObserver<IDriverStarted>, IObserver<IAllocatedEvaluator>, + IObserver<IFailedTask>, IObserver<ICompletedTask> + { + private readonly IEvaluatorRequestor _requestor; + private readonly IConfiguration _taskConfiguration; + + [Inject] + private FailedTaskDriver( + IEvaluatorRequestor requestor, + [Parameter(typeof(TaskConfigurationString))] string taskConfigString, + AvroConfigurationSerializer avroConfigurationSerializer) + { + _requestor = requestor; + _taskConfiguration = avroConfigurationSerializer.FromString(taskConfigString); + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + value.SubmitTask(_taskConfiguration); + } + + /// <summary> + /// Verify Exception message and exception type for different task id. + /// </summary> + /// <param name="value"></param> + public void OnNext(IFailedTask value) + { + var msg = string.Format(CultureInfo.InvariantCulture, + "In IFailedTask, taskId: {0}, Message {1}, Exception {2}.", + value.Id, + value.Message, + value.AsError().GetType()); + Logger.Log(Level.Info, msg); + + if (value.Id.Equals(TaskId + 1)) + { + Assert.Equal(TaskManager.TaskAppError, value.Message); + if (value.AsError() == null || !(value.AsError() is IMRUTaskAppException)) + { + throw new Exception(string.Format(CultureInfo.InvariantCulture, "Exception {0} should have been serialized properly.", typeof(IMRUTaskAppException))); + } + } + + if (value.Id.Equals(TaskId + 2)) + { + Assert.Equal(TaskManager.TaskGroupCommunicationError, value.Message); + + if (value.AsError() == null || !(value.AsError() is IMRUTaskGroupCommunicationException)) + { + throw new Exception(string.Format(CultureInfo.InvariantCulture, "Exception {0} should have been serialized properly.", typeof(IMRUTaskGroupCommunicationException))); + } + } + + if (value.Id.Equals(TaskId + 3)) + { + Assert.Equal(TaskManager.TaskSystemError, value.Message); + if (value.AsError() == null || !(value.AsError() is IMRUTaskSystemException)) + { + throw new Exception(string.Format(CultureInfo.InvariantCulture, "Exception {0} should have been serialized properly.", typeof(IMRUTaskSystemException))); + } + + Assert.Equal(InnerExceptionMessage, value.AsError().InnerException.Message); + Assert.True(value.AsError().InnerException is System.SystemException); + } + Logger.Log(Level.Info, ValidFailedTaskMessage); + value.GetActiveContext().Value.Dispose(); + } + + public void OnNext(ICompletedTask value) + { + Logger.Log(Level.Info, TaskCompletedMessage); + throw new Exception("Did not expect a completed task."); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + /// <summary> + /// Test task that throws exception based on the message specified in the constructor + /// </summary> + private sealed class ExceptionTask : ITask + { + private readonly string _taskExceptionMessage; + + [Inject] + private ExceptionTask([Parameter(typeof(TaskExceptionMessage))] string taskExceptionMessage) + { + _taskExceptionMessage = taskExceptionMessage; + } + + public void Dispose() + { + } + + /// <summary> + /// Throws corresponding exception based on the message received. + /// </summary> + /// <param name="memento"></param> + /// <returns></returns> + public byte[] Call(byte[] memento) + { + Logger.Log(Level.Info, "In ExceptionTask.Call(), _taskExceptionMessage: " + _taskExceptionMessage); + switch (_taskExceptionMessage) + { + case TaskManager.TaskAppError: + throw new IMRUTaskAppException(_taskExceptionMessage); + case TaskManager.TaskGroupCommunicationError: + throw new IMRUTaskGroupCommunicationException(_taskExceptionMessage); + default: + throw new IMRUTaskSystemException(_taskExceptionMessage, new SystemException(InnerExceptionMessage)); + } + } + } + + [NamedParameter("Task exception message", "TaskExceptionMessage")] + private class TaskExceptionMessage : Name<string> + { + } + + [NamedParameter("Task Configuration string", "TaskConfigurationString")] + private class TaskConfigurationString : Name<string> + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 066f1c2..3840a38 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -101,6 +101,7 @@ under the License. <Compile Include="Functional\FaultTolerant\TestResubmitTask.cs" /> <Compile Include="Functional\IMRU\IMRUBroadcastReduceTest.cs" /> <Compile Include="Functional\IMRU\IMRUMapperCountTest.cs" /> + <Compile Include="Functional\IMRU\TestTaskExceptions.cs" /> <Compile Include="Functional\Messaging\TestContextMessageSourceAndHandler.cs" /> <Compile Include="Functional\Messaging\TestMessageEventManager.cs" /> <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriverSpecifyingDefaultRuntimeName.cs" /> @@ -182,6 +183,10 @@ under the License. <Project>{6dc3b04e-2b99-4fda-bd23-2c7864f4c477}</Project> <Name>Org.Apache.REEF.IMRU.Examples</Name> </ProjectReference> + <ProjectReference Include="..\Org.Apache.REEF.IMRU\Org.Apache.REEF.IMRU.csproj"> + <Project>{cc797c57-b465-4d11-98ac-edaaef5899a6}</Project> + <Name>Org.Apache.REEF.IMRU</Name> + </ProjectReference> </ItemGroup> <ItemGroup> <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
