Repository: reef
Updated Branches:
  refs/heads/master d5a671b85 -> b5647362a


[REEF-1345] Define IMRU task exceptions

  * Create IMRU task exceptions
  * Add test cases
  * Update Task Manager to get exception from IFailedTask
  * Update Task Manager test cases

JIRA:
  [REEF-1345](https://issues.apache.org/jira/browse/REEF-1345)

Pull Request:
  This closes #1014


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/b5647362
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/b5647362
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/b5647362

Branch: refs/heads/master
Commit: b5647362a4d713983719eee2e7b79826b5071370
Parents: d5a671b
Author: Julia Wang <[email protected]>
Authored: Tue May 24 11:28:43 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Wed Jun 1 17:34:06 2016 -0700

----------------------------------------------------------------------
 .../TestTaskManager.cs                          |  16 ++
 .../OnREEF/Driver/TaskManager.cs                |  32 +--
 .../OnREEF/IMRUTasks/IMRUTaskAppException.cs    |  54 ++++
 .../IMRUTaskGroupCommunicationException.cs      |  56 ++++
 .../OnREEF/IMRUTasks/IMRUTaskSystemException.cs |  56 ++++
 .../Org.Apache.REEF.IMRU.csproj                 |   3 +
 .../Properties/AssemblyInfo.cs                  |   7 +
 .../Functional/IMRU/TestTaskExceptions.cs       | 258 +++++++++++++++++++
 .../Org.Apache.REEF.Tests.csproj                |   5 +
 9 files changed, 472 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
index 319f541..bdcebc2 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
@@ -22,6 +22,7 @@ using Org.Apache.REEF.Driver.Evaluator;
 using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.IMRU.OnREEF.Driver;
 using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities;
@@ -623,9 +624,24 @@ namespace Org.Apache.REEF.IMRU.Tests
         /// <returns></returns>
         private static IFailedTask CreateMockFailedTask(string taskId, string 
errorMsg)
         {
+            Exception taskException;
+            switch (errorMsg)
+            {
+                case TaskManager.TaskAppError:
+                    taskException = new IMRUTaskAppException(errorMsg);
+                    break;
+                case TaskManager.TaskGroupCommunicationError:
+                    taskException = new 
IMRUTaskGroupCommunicationException(errorMsg);
+                    break;
+                default:
+                    taskException = new IMRUTaskSystemException(errorMsg);
+                    break;
+            }
+
             IFailedTask failedtask = Substitute.For<IFailedTask>();
             failedtask.Id.Returns(taskId);
             failedtask.Message.Returns(errorMsg);
+            failedtask.AsError().Returns(taskException);
             return failedtask;
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
index 78af207..584809b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
@@ -23,6 +23,7 @@ using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
 using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Diagnostics;
@@ -220,7 +221,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         {
             //// Remove the task from running tasks if it exists there
             _runningTasks.Remove(failedTask.Id);
-            UpdateState(failedTask.Id, GetTaskErrorEvent(failedTask));
+            UpdateState(failedTask.Id, 
GetTaskErrorEventByExceptionType(failedTask));
         }
 
         /// <summary>
@@ -239,7 +240,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             }
             else if (taskState != 
StateMachine.TaskState.TaskFailedByEvaluatorFailure)
             {
-                UpdateState(failedTask.Id, GetTaskErrorEvent(failedTask));
+                UpdateState(failedTask.Id, 
GetTaskErrorEventByExceptionType(failedTask));
             }
         }
 
@@ -354,24 +355,25 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
-        /// Gets error type based on the information in IFailedTask 
-        /// Currently we use the Message in IFailedTask to distinguish 
different types of errors
+        /// Gets error type based on the exception type in IFailedTask 
         /// </summary>
         /// <param name="failedTask"></param>
         /// <returns></returns>
-        private TaskStateEvent GetTaskErrorEvent(IFailedTask failedTask)
+        private TaskStateEvent GetTaskErrorEventByExceptionType(IFailedTask 
failedTask)
         {
-            switch (failedTask.Message)
+            var exception = failedTask.AsError();
+            if (exception is IMRUTaskAppException)
             {
-                case TaskAppError:
-                    _numberOfAppErrors++;
-                    return TaskStateEvent.FailedTaskAppError;
-                case TaskSystemError:
-                    return TaskStateEvent.FailedTaskSystemError;
-                case TaskGroupCommunicationError:
-                    return TaskStateEvent.FailedTaskCommunicationError;
-                default:
-                    return TaskStateEvent.FailedTaskSystemError;
+                _numberOfAppErrors++;
+                return TaskStateEvent.FailedTaskAppError;
+            }
+            if (exception is IMRUTaskGroupCommunicationException)
+            {
+                return TaskStateEvent.FailedTaskCommunicationError;
+            }
+            else
+            {
+                return TaskStateEvent.FailedTaskSystemError;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskAppException.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskAppException.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskAppException.cs
new file mode 100644
index 0000000..bc0893c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskAppException.cs
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
+{
+    /// <summary>
+    /// A serializable exception that represents a task application error.
+    /// </summary>
+    [Serializable]
+    public sealed class IMRUTaskAppException : Exception
+    {
+        /// <summary>
+        /// Constructor. A serializable exception object that represents a 
task application error.
+        /// All the application related error should be captured in this type 
of exception.
+        /// When driver receives this type of exception, the system is not 
going to recover. 
+        /// </summary>
+        public IMRUTaskAppException(string message)
+            : base(message)
+        {
+        }
+
+        /// <summary>
+        /// Constructor. A serializable exception object that represents a 
task application error and wraps an inner exception
+        /// </summary>
+        /// <param name="message"></param>
+        /// <param name="innerException"></param>
+        public IMRUTaskAppException(string message, Exception innerException)
+            : base(message, innerException)
+        {
+        }
+
+        public IMRUTaskAppException(SerializationInfo info, StreamingContext 
context)
+            : base(info, context)
+        {
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskGroupCommunicationException.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskGroupCommunicationException.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskGroupCommunicationException.cs
new file mode 100644
index 0000000..7352056
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskGroupCommunicationException.cs
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
+{
+    /// <summary>
+    /// A serializable exception that represents a task group communication 
error.
+    /// </summary>
+    [Serializable]
+    internal sealed class IMRUTaskGroupCommunicationException : Exception
+    {
+        /// <summary>
+        /// Constructor. A serializable exception object that represents a 
task group communication error.
+        /// All the errors caused by group communication should be wrapped in 
this type of exception. For example, 
+        /// when one of the nodes in the group fails, other nodes are not able 
to receive messages
+        /// therefore fail. We should use this exception type to represent 
this error.  
+        /// When driver receives this type of exception, it is recoverable. 
+        /// </summary>
+        public IMRUTaskGroupCommunicationException(string message)
+            : base(message)
+        {
+        }
+
+        /// <summary>
+        /// Constructor. A serializable exception object that represents a 
task group communication error and wraps an inner exception
+        /// </summary>
+        /// <param name="message"></param>
+        /// <param name="innerException"></param>
+        public IMRUTaskGroupCommunicationException(string message, Exception 
innerException)
+            : base(message, innerException)
+        {
+        }
+
+        public IMRUTaskGroupCommunicationException(SerializationInfo info, 
StreamingContext context)
+            : base(info, context)
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskSystemException.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskSystemException.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskSystemException.cs
new file mode 100644
index 0000000..198146a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/IMRUTaskSystemException.cs
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Runtime.Serialization;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
+{
+    /// <summary>
+    /// A serialiable exception that represents a task system error.
+    /// </summary>
+    [Serializable]
+    internal sealed class IMRUTaskSystemException : Exception
+    {
+        /// <summary>
+        /// Constructor. A serializable exception object that represents a 
task system error.
+        /// This exception can be thrown when an exception happens in IMRU 
task and we would like 
+        /// driver to know this type of exception is thrown from IMRU system. 
+        /// For example, if when task is enforced to close by driver, this 
exception type can be used. 
+        /// When driver receives this type of exception, it is recoverable. 
+        /// </summary>
+        public IMRUTaskSystemException(string message)
+            : base(message)
+        {
+        }
+
+        /// <summary>
+        /// Constructor. A serializable exception object that represents a 
task system error and wraps an inner exception.
+        /// </summary>
+        /// <param name="message"></param>
+        /// <param name="innerException"></param>
+        public IMRUTaskSystemException(string message, Exception 
innerException)
+            : base(message, innerException)
+        {
+        }
+
+        public IMRUTaskSystemException(SerializationInfo info, 
StreamingContext context)
+            : base(info, context)
+        {
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index b51638a..da19271 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -90,6 +90,9 @@ under the License.
     <Compile 
Include="OnREEF\Driver\StateMachine\TaskStateTransitionException.cs" />
     <Compile Include="OnREEF\Driver\TaskInfo.cs" />
     <Compile Include="OnREEF\Driver\TaskManager.cs" />
+    <Compile Include="OnREEF\IMRUTasks\IMRUTaskAppException.cs" />
+    <Compile Include="OnREEF\IMRUTasks\IMRUTaskGroupCommunicationException.cs" 
/>
+    <Compile Include="OnREEF\IMRUTasks\IMRUTaskSystemException.cs" />
     <Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" />
     <Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" />
     <Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs" 
/>

http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs 
b/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs
index e426032..50b70e1 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs
@@ -38,3 +38,10 @@ using System.Runtime.InteropServices;
  
"9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc"
 +
  
"b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17"
 +
  
"618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")]
+
+// Allow the tests project access to `internal` APIs
+[assembly: InternalsVisibleTo("Org.Apache.REEF.Tests, publickey=" +
+ 
"00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9"
 +
+ 
"9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc"
 +
+ 
"b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17"
 +
+ 
"618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")]

http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestTaskExceptions.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestTaskExceptions.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestTaskExceptions.cs
new file mode 100644
index 0000000..08816cb
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestTaskExceptions.cs
@@ -0,0 +1,258 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Globalization;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    /// <summary>
+    /// This is to test task exceptions for IMRU tasks
+    /// </summary>
+    [Collection("FunctionalTests")]
+    public class TestTaskExceptions : ReefFunctionalTest
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(TestTaskExceptions));
+        private const string TaskId = "taskId";
+        private const string ValidFailedTaskMessage = "ValidFailedTaskMessage";
+        private const string TaskCompletedMessage = "TaskCompletedMessage";
+        private const string InnerExceptionMessage = "InnerExceptionMessage";
+
+        /// <summary>
+        /// Test IMRUTaskAppException
+        /// </summary>
+        [Fact]
+        public void TestTaskAppException()
+        {
+            string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfigurations(GetTaskConfiguration(TaskId + 1, 
TaskManager.TaskAppError)), typeof(FailedTaskDriver), 1, "TestTaskExceptions", 
"local", testFolder);
+            ValidateSuccessForLocalRuntime(1, 1, 0, testFolder);
+            ValidateMessageSuccessfullyLoggedForDriver(ValidFailedTaskMessage, 
testFolder, 1);
+            ValidateMessageSuccessfullyLoggedForDriver(TaskCompletedMessage, 
testFolder, 0);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// Test IMRU TestTaskGroupCommunicationException
+        /// </summary>
+        [Fact]
+        public void TestTaskGroupCommunicationException()
+        {
+            string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfigurations(GetTaskConfiguration(TaskId + 2, 
TaskManager.TaskGroupCommunicationError)), typeof(FailedTaskDriver), 1, 
"TestTaskExceptions", "local", testFolder);
+            ValidateSuccessForLocalRuntime(1, 1, 0, testFolder);
+            ValidateMessageSuccessfullyLoggedForDriver(ValidFailedTaskMessage, 
testFolder, 1);
+            ValidateMessageSuccessfullyLoggedForDriver(TaskCompletedMessage, 
testFolder, 0);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// Test IMRUTaskSystemException
+        /// </summary>
+        [Fact]
+        public void TestTaskSystemException()
+        {
+            string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfigurations(GetTaskConfiguration(TaskId + 3, 
TaskManager.TaskSystemError)), typeof(FailedTaskDriver), 1, 
"TestTaskExceptions", "local", testFolder);
+            ValidateSuccessForLocalRuntime(1, 1, 0, testFolder);
+            ValidateMessageSuccessfullyLoggedForDriver(ValidFailedTaskMessage, 
testFolder, 1);
+            ValidateMessageSuccessfullyLoggedForDriver(TaskCompletedMessage, 
testFolder, 0);
+            CleanUp(testFolder);
+        }
+
+        private IConfiguration DriverConfigurations(IConfiguration taskConfig)
+        {
+            var driverConfig = DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, 
GenericType<FailedTaskDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<FailedTaskDriver>.Class)
+                .Set(DriverConfiguration.OnTaskFailed, 
GenericType<FailedTaskDriver>.Class)
+                .Set(DriverConfiguration.OnTaskCompleted, 
GenericType<FailedTaskDriver>.Class)
+                .Build();
+
+            AvroConfigurationSerializer serializer = new 
AvroConfigurationSerializer();
+            return TangFactory.GetTang().NewConfigurationBuilder(driverConfig)
+                
.BindStringNamedParam<TaskConfigurationString>(serializer.ToString(taskConfig))
+                .Build();
+        }
+
+        private IConfiguration GetTaskConfiguration(string taskId, string 
message)
+        {
+            var taskConfig = TaskConfiguration.ConfigurationModule
+                .Set(TaskConfiguration.Identifier, taskId)
+                .Set(TaskConfiguration.Task, GenericType<ExceptionTask>.Class)
+                .Build();
+
+            var additionalConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
+                .BindNamedParameter<TaskExceptionMessage, string>(
+                    GenericType<TaskExceptionMessage>.Class, message)
+                .Build();
+
+            return Configurations.Merge(additionalConfig, taskConfig);
+        }
+
+        private sealed class FailedTaskDriver : IObserver<IDriverStarted>, 
IObserver<IAllocatedEvaluator>,
+            IObserver<IFailedTask>, IObserver<ICompletedTask>
+        {
+            private readonly IEvaluatorRequestor _requestor;
+            private readonly IConfiguration _taskConfiguration;
+
+            [Inject]
+            private FailedTaskDriver(
+                IEvaluatorRequestor requestor,
+                [Parameter(typeof(TaskConfigurationString))] string 
taskConfigString,
+                AvroConfigurationSerializer avroConfigurationSerializer)
+            {
+                _requestor = requestor;
+                _taskConfiguration = 
avroConfigurationSerializer.FromString(taskConfigString);
+            }
+
+            public void OnNext(IDriverStarted value)
+            {
+                _requestor.Submit(_requestor.NewBuilder().Build());
+            }
+
+            public void OnNext(IAllocatedEvaluator value)
+            {
+                value.SubmitTask(_taskConfiguration);
+            }
+
+            /// <summary>
+            /// Verify Exception message and exception type for different task 
id. 
+            /// </summary>
+            /// <param name="value"></param>
+            public void OnNext(IFailedTask value)
+            {
+                var msg = string.Format(CultureInfo.InvariantCulture,
+                    "In IFailedTask, taskId: {0}, Message {1}, Exception {2}.",
+                    value.Id,
+                    value.Message,
+                    value.AsError().GetType());
+                Logger.Log(Level.Info, msg);
+
+                if (value.Id.Equals(TaskId + 1))
+                {
+                    Assert.Equal(TaskManager.TaskAppError, value.Message);
+                    if (value.AsError() == null || !(value.AsError() is 
IMRUTaskAppException))
+                    {
+                        throw new 
Exception(string.Format(CultureInfo.InvariantCulture, "Exception {0} should 
have been serialized properly.", typeof(IMRUTaskAppException)));
+                    }
+                }
+
+                if (value.Id.Equals(TaskId + 2))
+                {
+                    Assert.Equal(TaskManager.TaskGroupCommunicationError, 
value.Message);
+
+                    if (value.AsError() == null || !(value.AsError() is 
IMRUTaskGroupCommunicationException))
+                    {
+                        throw new 
Exception(string.Format(CultureInfo.InvariantCulture, "Exception {0} should 
have been serialized properly.", typeof(IMRUTaskGroupCommunicationException)));
+                    }
+                }
+
+                if (value.Id.Equals(TaskId + 3))
+                {
+                    Assert.Equal(TaskManager.TaskSystemError, value.Message);
+                    if (value.AsError() == null || !(value.AsError() is 
IMRUTaskSystemException))
+                    {
+                        throw new 
Exception(string.Format(CultureInfo.InvariantCulture, "Exception {0} should 
have been serialized properly.", typeof(IMRUTaskSystemException)));
+                    }
+
+                    Assert.Equal(InnerExceptionMessage, 
value.AsError().InnerException.Message);
+                    Assert.True(value.AsError().InnerException is 
System.SystemException);
+                }
+                Logger.Log(Level.Info, ValidFailedTaskMessage);
+                value.GetActiveContext().Value.Dispose();
+            }
+
+            public void OnNext(ICompletedTask value)
+            {
+                Logger.Log(Level.Info, TaskCompletedMessage);
+                throw new Exception("Did not expect a completed task.");
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        /// <summary>
+        /// Test task that throws exception based on the message specified in 
the constructor
+        /// </summary>
+        private sealed class ExceptionTask : ITask
+        {
+            private readonly string _taskExceptionMessage;
+
+            [Inject]
+            private ExceptionTask([Parameter(typeof(TaskExceptionMessage))] 
string taskExceptionMessage)
+            {
+                _taskExceptionMessage = taskExceptionMessage;
+            }
+
+            public void Dispose()
+            {
+            }
+
+            /// <summary>
+            /// Throws corresponding exception based on the message received. 
+            /// </summary>
+            /// <param name="memento"></param>
+            /// <returns></returns>
+            public byte[] Call(byte[] memento)
+            {
+                Logger.Log(Level.Info, "In ExceptionTask.Call(), 
_taskExceptionMessage: " + _taskExceptionMessage);
+                switch (_taskExceptionMessage)
+                {
+                    case TaskManager.TaskAppError:
+                        throw new IMRUTaskAppException(_taskExceptionMessage);
+                    case TaskManager.TaskGroupCommunicationError:
+                        throw new 
IMRUTaskGroupCommunicationException(_taskExceptionMessage);
+                    default:
+                        throw new 
IMRUTaskSystemException(_taskExceptionMessage, new 
SystemException(InnerExceptionMessage));
+                }
+            }
+        }
+
+        [NamedParameter("Task exception message", "TaskExceptionMessage")]
+        private class TaskExceptionMessage : Name<string>
+        {
+        }
+
+        [NamedParameter("Task Configuration string", 
"TaskConfigurationString")]
+        private class TaskConfigurationString : Name<string>
+        {
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b5647362/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 066f1c2..3840a38 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -101,6 +101,7 @@ under the License.
     <Compile Include="Functional\FaultTolerant\TestResubmitTask.cs" />
     <Compile Include="Functional\IMRU\IMRUBroadcastReduceTest.cs" />
     <Compile Include="Functional\IMRU\IMRUMapperCountTest.cs" />
+    <Compile Include="Functional\IMRU\TestTaskExceptions.cs" />
     <Compile 
Include="Functional\Messaging\TestContextMessageSourceAndHandler.cs" />
     <Compile Include="Functional\Messaging\TestMessageEventManager.cs" />
     <Compile 
Include="Functional\RuntimeName\EvaluatorRequestingDriverSpecifyingDefaultRuntimeName.cs"
 />
@@ -182,6 +183,10 @@ under the License.
       <Project>{6dc3b04e-2b99-4fda-bd23-2c7864f4c477}</Project>
       <Name>Org.Apache.REEF.IMRU.Examples</Name>
     </ProjectReference>
+    <ProjectReference 
Include="..\Org.Apache.REEF.IMRU\Org.Apache.REEF.IMRU.csproj">
+      <Project>{cc797c57-b465-4d11-98ac-edaaef5899a6}</Project>
+      <Name>Org.Apache.REEF.IMRU</Name>
+    </ProjectReference>
   </ItemGroup>
   <ItemGroup>
     <Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />

Reply via email to