Repository: reef
Updated Branches:
  refs/heads/master 798f2dbb0 -> d6dbea6df


[REEF-1428] Validate Task Stop failure => FailedTask Event

This addressed the issue by
  * Writing a test to validate that a Task Stop failure triggers a FailedTask 
Event.
  * Adding helper test classes.
  * Moving the call to TaskStop prior to setting the result of the Task.
  * Inject all ITaskStart and ITaskStop.

JIRA:
  [REEF-1428](https://issues.apache.org/jira/browse/REEF-1428)

This closes #1038


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d6dbea6d
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d6dbea6d
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d6dbea6d

Branch: refs/heads/master
Commit: d6dbea6dfcced497c699e6d055015f72cf102c68
Parents: 798f2db
Author: Andrew Chung <[email protected]>
Authored: Tue Jun 7 16:11:08 2016 -0700
Committer: Julia Wang <[email protected]>
Committed: Thu Jun 9 10:48:19 2016 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Common.csproj               |   1 +
 .../Runtime/Evaluator/Context/ContextManager.cs |   1 -
 .../Runtime/Evaluator/Task/TaskLifeCycle.cs     |  44 ++--
 .../Runtime/Evaluator/Task/TaskRuntime.cs       |  11 +-
 .../Runtime/Evaluator/Task/TaskStatus.cs        |  13 +-
 .../Evaluator/Task/TaskStopHandlerException.cs  |  31 +++
 .../Functional/Bridge/TestContextStack.cs       |  14 +-
 .../Task/Handlers/ExceptionThrowingHandler.cs   |  58 ++++++
 .../Common/Task/Handlers/LoggingHandler.cs      |  53 +++++
 .../Functional/Common/Task/LoggingTask.cs       |  48 +++++
 .../Failure/User/TaskStopExceptionTest.cs       | 201 +++++++++++++++++++
 .../Functional/ReefFunctionalTest.cs            |  16 +-
 .../Org.Apache.REEF.Tests.csproj                |   4 +
 13 files changed, 448 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj 
b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index 6a64571..3058b91 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -166,6 +166,7 @@ under the License.
     <Compile Include="Runtime\Evaluator\Task\TaskStartImpl.cs" />
     <Compile Include="Runtime\Evaluator\Task\TaskState.cs" />
     <Compile Include="Runtime\Evaluator\Task\TaskStatus.cs" />
+    <Compile Include="Runtime\Evaluator\Task\TaskStopHandlerException.cs" />
     <Compile Include="Runtime\Evaluator\Task\TaskStopImpl.cs" />
     <Compile Include="Runtime\Evaluator\Utils\NamedparameterAlias.cs" />
     <Compile Include="runtime\MachineStatus.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs
index 3f7c5ae..1387d51 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs
@@ -350,7 +350,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         /// <param name="e"></param>
         private void HandleTaskException(TaskClientCodeException e)
         {
-            LOGGER.Log(Level.Error, "TaskClientCodeException", e);
             byte[] error;
             try
             {

http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs
index 115f4d2..687cb9c 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Common.Tasks.Events;
 using Org.Apache.REEF.Tang.Annotations;
@@ -29,8 +30,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
     {
         private readonly IReadOnlyCollection<IObserver<ITaskStop>> 
_taskStopHandlers;
         private readonly IReadOnlyCollection<IObserver<ITaskStart>> 
_taskStartHandlers;
-        private readonly Optional<ITaskStart> _taskStart;
-        private readonly Optional<ITaskStop> _taskStop;
+        private readonly ITaskStart _taskStart;
+        private readonly ITaskStop _taskStop;
+
+        private int _startHasBeenInvoked = 0;
+        private int _stopHasBeenInvoked = 0;
 
         [Inject]
         private TaskLifeCycle(
@@ -38,15 +42,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             [Parameter(typeof(TaskConfigurationOptions.StopHandlers))] 
ISet<IObserver<ITaskStop>> taskStopHandlers,
             ITaskStart taskStart,
             ITaskStop taskStop)
-            : this(taskStartHandlers, taskStopHandlers, 
Optional<ITaskStart>.Of(taskStart), Optional<ITaskStop>.Of(taskStop))
-        {
-        }
-
-        private TaskLifeCycle(
-            IEnumerable<IObserver<ITaskStart>> taskStartHandlers,
-            IEnumerable<IObserver<ITaskStop>> taskStopHandlers,
-            Optional<ITaskStart> taskStart,
-            Optional<ITaskStop> taskStop)
         {
             _taskStartHandlers = new 
ReadOnlySet<IObserver<ITaskStart>>(taskStartHandlers);
             _taskStopHandlers = new 
ReadOnlySet<IObserver<ITaskStop>>(taskStopHandlers);
@@ -56,27 +51,30 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
 
         public void Start() 
         {
-            if (!_taskStart.IsPresent())
+            if (Interlocked.Exchange(ref _startHasBeenInvoked, 1) == 0)
             {
-                return;
-            }
-
-            foreach (var startHandler in _taskStartHandlers)
-            {
-                startHandler.OnNext(_taskStart.Value);
+                foreach (var startHandler in _taskStartHandlers)
+                {
+                    startHandler.OnNext(_taskStart);
+                }
             }
         }
 
         public void Stop() 
         {
-            if (!_taskStop.IsPresent())
+            try
             {
-                return;
+                if (Interlocked.Exchange(ref _stopHasBeenInvoked, 1) == 0)
+                {
+                    foreach (var stopHandler in _taskStopHandlers)
+                    {
+                        stopHandler.OnNext(_taskStop);
+                    }
+                }
             }
-
-            foreach (var stopHandler in _taskStopHandlers)
+            catch (Exception e)
             {
-                stopHandler.OnNext(_taskStop.Value);
+                throw new TaskStopHandlerException("Encountered Exception on 
TaskStopHandler.", e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index 6297d2e..8a13d63 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -110,6 +110,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                             "Task running result:\r\n" + 
System.Text.Encoding.Default.GetString(result));
                     }
                 }
+                catch (TaskStopHandlerException e)
+                {
+                    _currentStatus.SetException(e.InnerException);
+                }
                 catch (Exception e)
                 {
                     _currentStatus.SetException(e);
@@ -156,11 +160,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
 
         public void Close(byte[] message)
         {
-            Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, 
"Trying to close Task {0}", TaskId));
+            Logger.Log(Level.Info, "Trying to close Task {0}", TaskId);
 
             if (_currentStatus.IsNotRunning())
             {
-                Logger.Log(Level.Warning, 
string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in 
{0} state. Ignored.", _currentStatus.State));
+                Logger.Log(Level.Warning, "Trying to close an task that is in 
{0} state. Ignored.", _currentStatus.State);
                 return;
             }
             try
@@ -171,8 +175,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             catch (Exception e)
             {
                 Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error 
during Close.", Logger);
-                _currentStatus.SetException(TaskClientCodeException.Create(
-                    TaskId, ContextId, "Error during Close().", e));
+                _currentStatus.SetException(e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
index 43eb55e..0347033 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
@@ -100,16 +100,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             {
                 try
                 {
-                    if (HasEnded())
-                    {
-                        // Note that this is possible if the job is already 
DONE, but a
-                        // Task Close is triggered prior to the DONE signal 
propagates to the
-                        // Driver. If the Task Close handler is not 
implemented, the Handler will 
-                        // mark the Task with an Exception, although for all 
intents and purposes
-                        // the Task is already done and should not be affected.
-                        return;
-                    }
-
                     if (!_lastException.IsPresent())
                     {
                         _lastException = Optional<Exception>.Of(e);
@@ -130,6 +120,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             lock (_heartBeatManager)
             {
                 _result = Optional<byte[]>.OfNullable(result);
+                _taskLifeCycle.Stop();
+
                 switch (State)
                 {
                     case TaskState.SuspendRequested:
@@ -140,7 +132,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                         State = TaskState.Done;
                         break;
                 }
-                _taskLifeCycle.Stop();
                 Heartbeat();
             }
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopHandlerException.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopHandlerException.cs
 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopHandlerException.cs
new file mode 100644
index 0000000..f86605d
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStopHandlerException.cs
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
+{
+    /// <summary>
+    /// An Exception that indicates that the TaskStopHandlers have triggered 
an Exception.
+    /// </summary>
+    internal sealed class TaskStopHandlerException : Exception
+    {
+        internal TaskStopHandlerException(string message, Exception inner) : 
base(message, inner)
+        {
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
index bfc91c1..f9a16dd 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestContextStack.cs
@@ -106,8 +106,8 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                 {
                     case ContextOneId:
                         var contextConfig =
-                            
Common.Context.ContextConfiguration.ConfigurationModule.Set(
-                                
Common.Context.ContextConfiguration.Identifier, ContextTwoId)
+                            
REEF.Common.Context.ContextConfiguration.ConfigurationModule.Set(
+                                
REEF.Common.Context.ContextConfiguration.Identifier, ContextTwoId)
                                 .Build();
                         var stackingContextConfig =
                             TangFactory.GetTang()
@@ -193,9 +193,9 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                 {
                     case ContextOneId:
                         var contextConfig =
-                            
Common.Context.ContextConfiguration.ConfigurationModule
-                                
.Set(Common.Context.ContextConfiguration.Identifier, ContextTwoId)
-                                
.Set(Common.Context.ContextConfiguration.OnContextStart, 
GenericType<TestContextStackContextStartHandler>.Class)
+                            
REEF.Common.Context.ContextConfiguration.ConfigurationModule
+                                
.Set(REEF.Common.Context.ContextConfiguration.Identifier, ContextTwoId)
+                                
.Set(REEF.Common.Context.ContextConfiguration.OnContextStart, 
GenericType<TestContextStackContextStartHandler>.Class)
                                 .Build();
 
                         var stackingContextConfig =
@@ -287,8 +287,8 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
 
             public void OnNext(IAllocatedEvaluator value)
             {
-                
value.SubmitContext(Common.Context.ContextConfiguration.ConfigurationModule
-                    .Set(Common.Context.ContextConfiguration.Identifier, 
ContextOneId)
+                
value.SubmitContext(REEF.Common.Context.ContextConfiguration.ConfigurationModule
+                    .Set(REEF.Common.Context.ContextConfiguration.Identifier, 
ContextOneId)
                     .Build());
             }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs
new file mode 100644
index 0000000..6e66dac
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/ExceptionThrowingHandler.cs
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+
+namespace Org.Apache.REEF.Tests.Functional.Common.Task.Handlers
+{
+    /// <summary>
+    /// A helper test class that implements <see cref="IObserver{T}"/>, which 
throws an
+    /// Exception after executing an optional Action provided by the caller of 
the constructor.
+    /// </summary>
+    internal abstract class ExceptionThrowingHandler<T> : IObserver<T>
+    {
+        private readonly Exception _exceptionToThrow;
+        private readonly Action<T> _action;
+
+        protected ExceptionThrowingHandler(
+            Exception exceptionToThrow, Action<T> action = null)
+        {
+            _exceptionToThrow = exceptionToThrow;
+            _action = action;
+        }
+
+        public void OnNext(T value)
+        {
+            if (_action != null)
+            {
+                _action(value);
+            }
+
+            throw _exceptionToThrow;
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/LoggingHandler.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/LoggingHandler.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/LoggingHandler.cs
new file mode 100644
index 0000000..4133310
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/Handlers/LoggingHandler.cs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.Common.Task.Handlers
+{
+    /// <summary>
+    /// A helper test class that implements <see cref="IObserver{T}"/>, which 
logs
+    /// a message provided by the caller of the constructor.
+    /// </summary>
+    public abstract class LoggingHandler<T> : IObserver<T>
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(LoggingHandler<>));
+        
+        private readonly string _messageToLog;
+
+        protected LoggingHandler(string messageToLog)
+        {
+            _messageToLog = messageToLog;
+        } 
+
+        public void OnNext(T value)
+        {
+            Logger.Log(Level.Info, _messageToLog);
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/LoggingTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/LoggingTask.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/LoggingTask.cs
new file mode 100644
index 0000000..04ba961
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/LoggingTask.cs
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.Common.Task
+{
+    /// <summary>
+    /// A helper test class that implements <see cref="ITask"/>, which logs
+    /// a message provided by the caller of the constructor.
+    /// </summary>
+    public abstract class LoggingTask : ITask
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(LoggingTask));
+
+        private readonly string _messageToLog;
+
+        protected LoggingTask(string messageToLog)
+        {
+            _messageToLog = messageToLog;
+        }
+
+        public void Dispose()
+        {
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            Logger.Log(Level.Info, _messageToLog);
+            return null;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs
new file mode 100644
index 0000000..84aba1f
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Tests.Functional.Common.Task;
+using Org.Apache.REEF.Tests.Functional.Common.Task.Handlers;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Failure.User
+{
+    /// <summary>
+    /// This test class contains a test that validates that an Exception in 
the 
+    /// TaskStopHandler causes a FailedTask event in the Driver.
+    /// </summary>
+    [Collection("FunctionalTests")]
+    public sealed class TaskStopExceptionTest : ReefFunctionalTest
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(TaskStopExceptionTest));
+
+        private const string TaskStopExceptionMessage = 
"TaskStopExceptionMessage";
+        private const string InitialTaskMessage = "InitialTaskMessage";
+        private const string ResubmitTaskMessage = "ResubmitTaskMessage";
+        private const string FailedTaskReceived = "FailedTaskReceived";
+        private const string CompletedTaskReceived = "CompletedTaskReceived";
+
+        /// <summary>
+        /// This test validates that an Exception in the TaskStopHandler 
causes a FailedTask
+        /// event in the Driver, and that a new Task can be submitted on the 
original Context.
+        /// </summary>
+        [Fact]
+        public void TestStopTaskWithExceptionOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, 
GenericType<TaskStopExceptionTestDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<TaskStopExceptionTestDriver>.Class)
+                .Set(DriverConfiguration.OnTaskCompleted, 
GenericType<TaskStopExceptionTestDriver>.Class)
+                .Set(DriverConfiguration.OnTaskFailed, 
GenericType<TaskStopExceptionTestDriver>.Class)
+                .Build(), typeof(TaskStopExceptionTestDriver), 1, 
"testStopTaskWithExceptionOnLocalRuntime", "local", testFolder);
+
+            var driverMessages = new List<string>
+            {
+                FailedTaskReceived,
+                CompletedTaskReceived
+            };
+
+            ValidateMessagesSuccessfullyLoggedForDriver(driverMessages, 
testFolder, 1);
+            ValidateMessageSuccessfullyLogged(driverMessages, "driver", 
DriverStdout, testFolder, 1);
+
+            var evaluatorMessages = new List<string> { InitialTaskMessage, 
ResubmitTaskMessage };
+            ValidateMessageSuccessfullyLogged(evaluatorMessages, "Node-*", 
EvaluatorStdout, testFolder, 1);
+            CleanUp(testFolder);
+        }
+
+        private sealed class TaskStopExceptionTestDriver : 
+            IObserver<IDriverStarted>, 
+            IObserver<IAllocatedEvaluator>, 
+            IObserver<ICompletedTask>, 
+            IObserver<IFailedTask>
+        {
+            private readonly IEvaluatorRequestor _requestor;
+
+            [Inject]
+            private TaskStopExceptionTestDriver(IEvaluatorRequestor requestor)
+            {
+                _requestor = requestor;
+            }
+
+            public void OnNext(IDriverStarted value)
+            {
+                _requestor.Submit(_requestor.NewBuilder().Build());
+            }
+
+            public void OnNext(IAllocatedEvaluator value)
+            {
+                // submit the first Task.
+                value.SubmitTask(TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, "TaskID")
+                        .Set(TaskConfiguration.Task, 
GenericType<TaskStopExceptionTask>.Class)
+                        .Set(TaskConfiguration.OnTaskStop, 
GenericType<TaskStopHandlerWithException>.Class)
+                        .Build());
+            }
+
+            public void OnNext(ICompletedTask value)
+            {
+                // Should only receive one CompletedTask, as validated.
+                Logger.Log(Level.Info, CompletedTaskReceived);
+                value.ActiveContext.Dispose();
+            }
+
+            public void OnNext(IFailedTask value)
+            {
+                // Check that Exceptions are deserialized correctly.
+                var ex = value.AsError();
+                if (ex == null)
+                {
+                    throw new Exception("Exception was not expected to be 
null.");
+                }
+
+                var taskStopEx = ex as TaskStopExceptionTestException;
+
+                if (taskStopEx == null)
+                {
+                    throw new Exception("Expected Exception to be of type 
TaskStopExceptionTestException, but instead got type " + ex.GetType().Name);
+                }
+
+                if (taskStopEx.Message != TaskStopExceptionMessage)
+                {
+                    throw new Exception(
+                        "Expected message to be " + TaskStopExceptionMessage + 
" but instead got " + taskStopEx.Message + ".");
+                }
+
+                Logger.Log(Level.Info, FailedTaskReceived);
+
+                // Submit the new Task to verify that the original Context 
accepts new Tasks.
+                value.GetActiveContext().Value.SubmitTask(
+                    TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, "TaskID")
+                        .Set(TaskConfiguration.Task, 
GenericType<TaskStopExceptionResubmitTask>.Class)
+                        .Build());
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private sealed class TaskStopExceptionTask : LoggingTask
+        {
+            [Inject]
+            private TaskStopExceptionTask() 
+                : base(InitialTaskMessage)
+            {
+            }
+        }
+
+        private sealed class TaskStopExceptionResubmitTask : LoggingTask
+        {
+            [Inject]
+            private TaskStopExceptionResubmitTask() 
+                : base(ResubmitTaskMessage)
+            {
+            }
+        }
+
+        private sealed class TaskStopHandlerWithException : 
ExceptionThrowingHandler<ITaskStop>
+        {
+            [Inject]
+            private TaskStopHandlerWithException() : 
+                base(new 
TaskStopExceptionTestException(TaskStopExceptionMessage))
+            {
+            }
+        }
+
+        /// <summary>
+        /// A Serializable Exception to verify that the Exception is 
deserialized correctly.
+        /// </summary>
+        [Serializable]
+        private sealed class TaskStopExceptionTestException : Exception
+        {
+            public TaskStopExceptionTestException(string message) : 
base(message)
+            {
+            }
+
+            private TaskStopExceptionTestException(SerializationInfo info, 
StreamingContext context)
+                : base(info, context)
+            {
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
index d71f20d..149cc9b 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs
@@ -196,6 +196,19 @@ namespace Org.Apache.REEF.Tests.Functional
         }
 
         /// <summary>
+        /// See <see cref="ValidateMessageSuccessfullyLogged"/> for detail. 
This function is <see cref="ValidateMessageSuccessfullyLogged"/>
+        /// for the driver log.
+        /// </summary>
+        protected void ValidateMessagesSuccessfullyLoggedForDriver(
+            IEnumerable<string> messages,
+            string testFolder,
+            int numberOfOccurrences = 1)
+        {
+            var msgs = new List<string>(messages);
+            ValidateMessageSuccessfullyLogged(msgs, "driver", DriverStdout, 
testFolder, numberOfOccurrences);
+        }
+
+        /// <summary>
         /// Validates that each of the message provided in the <see 
cref="messages"/> parameter occurs 
         /// some number of times.
         /// If <see cref="numberOfOccurrences"/> is greater than or equal to 
0, validates that each of the message in 
@@ -203,7 +216,8 @@ namespace Org.Apache.REEF.Tests.Functional
         /// If <see cref="numberOfOccurrences"/> is less than 0, validates 
that each of the message in <see cref="messages"/>
         /// occur at least once.
         /// </summary>
-        protected void ValidateMessageSuccessfullyLogged(IList<string> 
messages, string subfolder, string fileName, string testFolder, int 
numberOfOccurrences = 1)
+        protected void ValidateMessageSuccessfullyLogged(
+            IEnumerable<string> messages, string subfolder, string fileName, 
string testFolder, int numberOfOccurrences = 1)
         {
             string[] lines = ReadLogFile(fileName, subfolder, testFolder);
             foreach (string message in messages)

http://git-wip-us.apache.org/repos/asf/reef/blob/d6dbea6d/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 1c6949a..0746e12 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -86,6 +86,9 @@ under the License.
     <Compile Include="Functional\Bridge\TestSimpleEventHandlers.cs" />
     <Compile Include="Functional\Bridge\TestSuspendTask.cs" />
     <Compile Include="Functional\Bridge\TestUnhandledTaskException.cs" />
+    <Compile Include="Functional\Common\Task\Handlers\LoggingHandler.cs" />
+    <Compile Include="Functional\Common\Task\LoggingTask.cs" />
+    <Compile 
Include="Functional\Common\Task\Handlers\ExceptionThrowingHandler.cs" />
     <Compile Include="Functional\Driver\DriverTestStartHandler.cs" />
     <Compile 
Include="Functional\Failure\BasePoisonedEvaluatorWithActiveContextDriver.cs" />
     <Compile 
Include="Functional\Failure\BasePoisonedEvaluatorWithRunningTaskDriver.cs" />
@@ -97,6 +100,7 @@ under the License.
     <Compile 
Include="Functional\Failure\TestEvaluatorWithRunningTaskImmediatePoison.cs" />
     <Compile Include="Functional\Failure\SleepTask.cs" />
     <Compile Include="Functional\Failure\User\TaskConstructorExceptionTest.cs" 
/>
+    <Compile Include="Functional\Failure\User\TaskStopExceptionTest.cs" />
     <Compile Include="Functional\FaultTolerant\TestContextStart.cs" />
     <Compile Include="Functional\FaultTolerant\TestResubmitEvaluator.cs" />
     <Compile Include="Functional\FaultTolerant\TestResubmitTask.cs" />

Reply via email to