Repository: reef
Updated Branches:
  refs/heads/master b5647362a -> 414d4b451


[REEF-1392] Adding IObserver<ICloseEvent> for IMRU tasks

* IMRU tasks implement IObserver<ICloseEvent>
* Task will return after it receives the close event if possible or enforce to 
close
* Update TaskStateMachine to allow transit from TaskWaitingForClose with 
CompletedTask event to TaskClosedByDriver state
* Adding test cases

JIRA: [REEF-1392](https://issues.apache.org/jira/browse/REEF-1392)
This closes  #1009


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/414d4b45
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/414d4b45
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/414d4b45

Branch: refs/heads/master
Commit: 414d4b45114c6229720262f3588ecff3a35334b1
Parents: b564736
Author: Julia Wang <[email protected]>
Authored: Wed Jun 1 19:28:50 2016 -0700
Committer: dhruv <[email protected]>
Committed: Wed Jun 1 22:47:08 2016 -0700

----------------------------------------------------------------------
 .../TestTaskStates.cs                           |  17 +-
 .../OnREEF/Driver/IMRUDriver.cs                 |   3 +
 .../Driver/StateMachine/TaskStateMachine.cs     |   1 +
 .../OnREEF/Driver/TaskManager.cs                |  25 +--
 .../OnREEF/IMRUTasks/MapTaskHost.cs             |  77 ++++++-
 .../OnREEF/IMRUTasks/UpdateTaskHost.cs          |  81 ++++++-
 .../EnforceCloseTimeoutMilliseconds.cs          |  31 +++
 .../Org.Apache.REEF.IMRU.csproj                 |   1 +
 .../Functional/Bridge/TestCloseTask.cs          | 218 +++++++++++++++++--
 .../Org.Apache.REEF.Tests.csproj                |   2 +-
 10 files changed, 413 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskStates.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskStates.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskStates.cs
index 0c858da..561202e 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskStates.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskStates.cs
@@ -125,14 +125,25 @@ namespace Org.Apache.REEF.IMRU.Tests
             Action moveNext = () => 
taskState.MoveNext(TaskStateEvent.RunningTask);
             Assert.Throws<TaskStateTransitionException>(moveNext);
 
-            moveNext = () => taskState.MoveNext(TaskStateEvent.CompletedTask);
-            Assert.Throws<TaskStateTransitionException>(moveNext);
-
             moveNext = () => taskState.MoveNext(TaskStateEvent.SubmittedTask);
             Assert.Throws<TaskStateTransitionException>(moveNext);
         }
 
         /// <summary>
+        /// This is to test from WaitingTaskToClose to receiving CompletedTask 
event
+        /// </summary>
+        [Fact]
+        public void TestRunningToWaitingTaskToCloseToComplete()
+        {
+            var taskState = new TaskStateMachine();
+            taskState.MoveNext(TaskStateEvent.SubmittedTask);
+            taskState.MoveNext(TaskStateEvent.RunningTask);
+            taskState.MoveNext(TaskStateEvent.WaitingTaskToClose);
+            taskState.MoveNext(TaskStateEvent.CompletedTask);
+            Assert.Equal(TaskState.TaskClosedByDriver, taskState.CurrentState);
+        }
+
+        /// <summary>
         /// This is to test from RunningTask to TaskFailedByEvaluatorFailure.
         /// </summary>
         [Fact]

http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 59be761..58b75ed 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -329,6 +329,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                 .NewConfigurationBuilder(TaskConfiguration.ConfigurationModule
                     .Set(TaskConfiguration.Identifier, taskId)
                     .Set(TaskConfiguration.Task, 
GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class)
+                    .Set(TaskConfiguration.OnClose, 
GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class)
                     .Build(),
                     _configurationManager.MapFunctionConfiguration,
                     mapSpecificConfig,
@@ -351,6 +352,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                             IMRUConstants.UpdateTaskName)
                         .Set(TaskConfiguration.Task,
                             GenericType<UpdateTaskHost<TMapInput, TMapOutput, 
TResult>>.Class)
+                        .Set(TaskConfiguration.OnClose,
+                            GenericType<UpdateTaskHost<TMapInput, TMapOutput, 
TResult>>.Class)
                         .Build(),
                         _configurationManager.UpdateFunctionConfiguration,
                         _configurationManager.ResultHandlerConfiguration,

http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/StateMachine/TaskStateMachine.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/StateMachine/TaskStateMachine.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/StateMachine/TaskStateMachine.cs
index 17c0764..b1ed8dc 100644
--- 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/StateMachine/TaskStateMachine.cs
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/StateMachine/TaskStateMachine.cs
@@ -50,6 +50,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine
             { new StateTransition<TaskState, 
TaskStateEvent>(TaskState.TaskWaitingForClose, 
TaskStateEvent.FailedTaskSystemError), TaskState.TaskClosedByDriver },
             { new StateTransition<TaskState, 
TaskStateEvent>(TaskState.TaskWaitingForClose, 
TaskStateEvent.FailedTaskEvaluatorError), TaskState.TaskClosedByDriver },
             { new StateTransition<TaskState, 
TaskStateEvent>(TaskState.TaskWaitingForClose, 
TaskStateEvent.FailedTaskCommunicationError), TaskState.TaskClosedByDriver },
+            { new StateTransition<TaskState, 
TaskStateEvent>(TaskState.TaskWaitingForClose, TaskStateEvent.CompletedTask), 
TaskState.TaskClosedByDriver },
             { new StateTransition<TaskState, 
TaskStateEvent>(TaskState.TaskFailedBySystemError, 
TaskStateEvent.FailedTaskEvaluatorError), 
TaskState.TaskFailedByEvaluatorFailure },
             { new StateTransition<TaskState, 
TaskStateEvent>(TaskState.TaskFailedByGroupCommunication, 
TaskStateEvent.FailedTaskEvaluatorError), 
TaskState.TaskFailedByEvaluatorFailure }
         });

http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
index 584809b..3bf6d75 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
@@ -201,13 +201,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 
         /// <summary>
         /// This method is called when receiving ICompletedTask event during 
task running or system shutting down.
-        /// Removes the task from running tasks
+        /// Removes the task from running tasks if it was running
         /// Changes the task state from RunningTask to CompletedTask
         /// </summary>
         /// <param name="completedTask"></param>
         internal void RecordCompletedTask(ICompletedTask completedTask)
         {
-            RemoveRunningTask(completedTask.Id);
+            _runningTasks.Remove(completedTask.Id);
             UpdateState(completedTask.Id, TaskStateEvent.CompletedTask);
         }
 
@@ -258,7 +258,12 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                 var taskState = GetTaskState(taskId);
                 if (taskState == StateMachine.TaskState.TaskRunning)
                 {
-                    RemoveRunningTask(taskId);
+                    if (!_runningTasks.ContainsKey(taskId))
+                    {
+                        var msg = string.Format(CultureInfo.InvariantCulture, 
"The task [{0}] doesn't exist in Running Tasks.", taskId);
+                        Exceptions.Throw(new IMRUSystemException(msg), Logger);
+                    }
+                    _runningTasks.Remove(taskId);
                 }
 
                 UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError);
@@ -266,20 +271,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
-        /// Removes a task from running tasks if it exists in the running 
tasks collection
-        /// </summary>
-        /// <param name="taskId"></param>
-        private void RemoveRunningTask(string taskId)
-        {
-            if (!_runningTasks.ContainsKey(taskId))
-            {
-                var msg = string.Format(CultureInfo.InvariantCulture, "The 
task [{0}] doesn't exist in Running Tasks.", taskId);
-                Exceptions.Throw(new IMRUSystemException(msg), Logger);
-            }
-            _runningTasks.Remove(taskId);
-        }
-
-        /// <summary>
         /// Updates task state for a given taskId based on the task event
         /// </summary>
         /// <param name="taskId"></param>

http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
index c4a101d..5f9823a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
@@ -16,7 +16,10 @@
 // under the License.
 
 using System;
+using System.Text;
+using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Tasks.Events;
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IMRU.OnREEF.Driver;
 using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
@@ -24,6 +27,7 @@ using Org.Apache.REEF.IMRU.OnREEF.Parameters;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
@@ -33,7 +37,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
     /// </summary>
     /// <typeparam name="TMapInput">Map input</typeparam>
     /// <typeparam name="TMapOutput">Map output</typeparam>
-    internal sealed class MapTaskHost<TMapInput, TMapOutput> : ITask
+    [ThreadSafe]
+    internal sealed class MapTaskHost<TMapInput, TMapOutput> : ITask, 
IObserver<ICloseEvent>
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(MapTaskHost<TMapInput, TMapOutput>));
 
@@ -43,14 +48,38 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private readonly bool _invokeGC;
 
         /// <summary>
+        /// When receiving a close event, this variable is set to 1. At the 
beginning of each task iteration,
+        /// if this variable is set to 1, the task will break from the loop 
and return from the Call() method.
+        /// </summary>
+        private long _shouldCloseTask = 0;
+
+        /// <summary>
+        /// Before the task is returned, this variable is set to 1.
+        /// Close handler will check this variable to decide if it needs to 
throw an exception.
+        /// </summary>
+        private long _isTaskStopped = 0;
+
+        /// <summary>
+        /// Waiting time for the task to close by itself
+        /// </summary>
+        private readonly int _enforceCloseTimeoutMilliseconds;
+
+        /// <summary>
+        /// An event that will wait in close handler until it is either 
signaled from Call method or timeout.
+        /// </summary>
+        private readonly ManualResetEventSlim _waitToCloseEvent = new 
ManualResetEventSlim(false);
+
+        /// <summary>
         /// </summary>
         /// <param name="mapTask">The MapTask hosted in this REEF Task.</param>
         /// <param name="groupCommunicationsClient">Used to setup the 
communications.</param>
+        /// <param name="enforceCloseTimeoutMilliseconds">Timeout to enforce 
the task to close if receiving task close event</param>
         /// <param name="invokeGC">Whether to call Garbage Collector after 
each iteration or not</param>
         [Inject]
         private MapTaskHost(
             IMapFunction<TMapInput, TMapOutput> mapTask,
             IGroupCommClient groupCommunicationsClient,
+            [Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int 
enforceCloseTimeoutMilliseconds,
             [Parameter(typeof(InvokeGC))] bool invokeGC)
         {
             _mapTask = mapTask;
@@ -59,6 +88,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
                 
cg.GetBroadcastReceiver<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName);
             _dataReducer = 
cg.GetReduceSender<TMapOutput>(IMRUConstants.ReduceOperatorName);
             _invokeGC = invokeGC;
+            _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds;
         }
 
         /// <summary>
@@ -68,7 +98,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// <returns></returns>
         public byte[] Call(byte[] memento)
         {
-            while (true)
+            while (Interlocked.Read(ref _shouldCloseTask) == 0)
             {
                 if (_invokeGC)
                 {
@@ -91,14 +121,55 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
              
                 _dataReducer.Send(result);
             }
+
+            Interlocked.Exchange(ref _isTaskStopped, 1);
+
+            if (Interlocked.Read(ref _shouldCloseTask) == 1)
+            {
+                _waitToCloseEvent.Set();
+            }
             return null;
         }
 
         /// <summary>
-        /// Dispose function 
+        /// Task close handler.
+        /// If the closed event is sent from driver, set _shouldCloseTask to 1 
so that to inform the Call() to stop at the end of the current iteration.
+        /// Then waiting for the signal from Call method. Either it is 
signaled or after _enforceCloseTimeoutMilliseconds,
+        /// checks if the task has been stopped. If not, throw 
IMRUTaskSystemException to enforce the task to stop.
+        /// </summary>
+        /// <param name="closeEvent"></param>
+        public void OnNext(ICloseEvent closeEvent)
+        {
+            var msg = Encoding.UTF8.GetString(closeEvent.Value.Value);
+            if (closeEvent.Value.IsPresent() && 
msg.Equals(TaskManager.CloseTaskByDriver))
+            {
+                Logger.Log(Level.Info, "The task received close event with 
message: {0}.", msg);
+                Interlocked.Exchange(ref _shouldCloseTask, 1);
+
+                
_waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds));
+
+                if (Interlocked.Read(ref _isTaskStopped) == 0)
+                {
+                    throw new 
IMRUTaskSystemException(TaskManager.TaskKilledByDriver);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Dispose function
         /// </summary>
         public void Dispose()
         {
         }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
index f03a8e1..4f9ad9b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
@@ -16,7 +16,10 @@
 // under the License.
 
 using System;
+using System.Text;
+using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Tasks.Events;
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IMRU.OnREEF.Driver;
 using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
@@ -24,6 +27,7 @@ using Org.Apache.REEF.IMRU.OnREEF.Parameters;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
@@ -34,7 +38,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
     /// <typeparam name="TMapInput">Map input</typeparam>
     /// <typeparam name="TMapOutput">Map output</typeparam>
     /// <typeparam name="TResult">Final result</typeparam>
-    internal sealed class UpdateTaskHost<TMapInput, TMapOutput, TResult> : 
ITask
+    [ThreadSafe]
+    internal sealed class UpdateTaskHost<TMapInput, TMapOutput, TResult> : 
ITask, IObserver<ICloseEvent>
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(UpdateTaskHost<TMapInput, TMapOutput, TResult>));
 
@@ -45,16 +50,40 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private readonly IIMRUResultHandler<TResult> _resultHandler;
 
         /// <summary>
+        /// When receiving a close event, this variable is set to 1. At the 
beginning of each task iteration,
+        /// if this variable is set to 1, the task will break from the loop 
and return from the Call() method.
+        /// </summary>
+        private long _shouldCloseTask = 0;
+
+        /// <summary>
+        /// Before the task is returned, this variable is set to 1.
+        /// Close handler will check this variable to decide if it needs to 
throw an exception.
+        /// </summary>
+        private long _isTaskStopped = 0;
+
+        /// <summary>
+        /// Waiting time for the task to close by itself
+        /// </summary>
+        private readonly int _enforceCloseTimeoutMilliseconds;
+
+        /// <summary>
+        /// An event that will wait in close handler until it is either 
signaled from Call method or timeout.
+        /// </summary>
+        private readonly ManualResetEventSlim _waitToCloseEvent = new 
ManualResetEventSlim(false);
+
+        /// <summary>
         /// </summary>
         /// <param name="updateTask">The UpdateTask hosted in this REEF 
Task.</param>
         /// <param name="groupCommunicationsClient">Used to setup the 
communications.</param>
         /// <param name="resultHandler">Result handler</param>
+        /// <param name="enforceCloseTimeoutMilliseconds">Timeout in 
milliseconds to enforce the task to close if receiving task close event</param>
         /// <param name="invokeGC">Whether to call Garbage Collector after 
each iteration or not</param>
         [Inject]
         private UpdateTaskHost(
             IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask,
             IGroupCommClient groupCommunicationsClient,
             IIMRUResultHandler<TResult> resultHandler,
+            [Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int 
enforceCloseTimeoutMilliseconds,
             [Parameter(typeof(InvokeGC))] bool invokeGC)
         {
             _updateTask = updateTask;
@@ -64,6 +93,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             _dataReceiver = 
cg.GetReduceReceiver<TMapOutput>(IMRUConstants.ReduceOperatorName);
             _invokeGC = invokeGC;
             _resultHandler = resultHandler;
+            _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds;
         }
 
         /// <summary>
@@ -76,7 +106,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             var updateResult = _updateTask.Initialize();
             int iterNo = 0;
 
-            while (updateResult.HasMapInput)
+            while (updateResult.HasMapInput && Interlocked.Read(ref 
_shouldCloseTask) == 0)
             {
                 iterNo++;
 
@@ -104,19 +134,62 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
                 }
             }
 
-            MapInputWithControlMessage<TMapInput> stopMessage =
+            if (Interlocked.Read(ref _shouldCloseTask) == 0)
+            {
+                MapInputWithControlMessage<TMapInput> stopMessage =
                     new 
MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop);
-            _dataAndControlMessageSender.Send(stopMessage);
+                _dataAndControlMessageSender.Send(stopMessage);
+            }
 
             _resultHandler.Dispose();
+            Interlocked.Exchange(ref _isTaskStopped, 1);
+
+            if (Interlocked.Read(ref _shouldCloseTask) == 1)
+            {
+                _waitToCloseEvent.Set();
+            }
             return null;
         }
 
         /// <summary>
+        /// Task close handler.
+        /// If the closed event is sent from driver, set _shouldCloseTask to 1 
so that to inform the Call() to stop at the end of the current iteration.
+        /// Then waiting for the signal from Call method. Either it is 
signaled or after _enforceCloseTimeoutMilliseconds,
+        /// checks if the task has been stopped. If not, throw 
IMRUTaskSystemException to enforce the task to stop.
+        /// </summary>
+        /// <param name="closeEvent"></param>
+        public void OnNext(ICloseEvent closeEvent)
+        {
+            var msg = Encoding.UTF8.GetString(closeEvent.Value.Value);
+            if (closeEvent.Value.IsPresent() && 
msg.Equals(TaskManager.CloseTaskByDriver))
+            {
+                Logger.Log(Level.Info, "The task received close event with 
message: {0}.", msg);
+                Interlocked.Exchange(ref _shouldCloseTask, 1);
+
+                
_waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds));
+
+                if (Interlocked.Read(ref _isTaskStopped) == 0)
+                {
+                    throw new 
IMRUTaskSystemException(TaskManager.TaskKilledByDriver);
+                }
+            }
+        }
+
+        /// <summary>
         /// Dispose function
         /// </summary>
         public void Dispose()
         {
         }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs
new file mode 100644
index 0000000..e177895
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+    /// <summary>
+    /// When driver sends close event to a task, it would expect the task to 
close gracefully. 
+    /// After specified time out, if the task is still not closed, the close 
handler will throw exception, 
+    /// enforce the task to close after waiting for this much time (in 
milliseconds). 
+    /// </summary>
+    [NamedParameter("Enforce the task to close after waiting for this much 
time (in milliseconds).", "EnforceCloseTimeout", "1000")]
+    internal sealed class EnforceCloseTimeoutMilliseconds : Name<int>
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index da19271..3f62b1f 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -99,6 +99,7 @@ under the License.
     <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" />
     <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessageCodec.cs" 
/>
     <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputwithControlMessagePipelineDataConverter.cs"
 />
+    <Compile Include="OnREEF\Parameters\EnforceCloseTimeoutMilliseconds.cs" />
     <Compile Include="OnREEF\Parameters\InvokeGC .cs" />
     <Compile Include="OnREEF\Parameters\AllowedFailedEvaluatorsFraction.cs" />
     <Compile Include="OnREEF\Parameters\CoresForUpdateTask.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
index a18a211..2e97320 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs
@@ -27,6 +27,9 @@ using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Context;
 using Org.Apache.REEF.Driver.Evaluator;
 using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
@@ -51,6 +54,8 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         private const string NoMessage = "NO_MESSAGE";
         private const string CompletedValidationMessage = 
"CompletedValidationmessage";
         private const string FailToCloseTaskMessage = "FailToCloseTaskMessage";
+        private const string BreakTaskMessage = "BreakTaskMessage";
+        private const string EnforceToCloseMessage = "EnforceToCloseMessage";
 
         /// <summary>
         /// This test is close a running task with a close handler registered
@@ -69,6 +74,40 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         }
 
         /// <summary>
+        /// This test is to close a running task and enforce it to break and 
return after the current iteration
+        /// </summary>
+        [Fact]
+        public void TestBreakTaskOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfigurations(DisposeMessageFromDriver, 
GetTaskConfigurationForBreakTask()), typeof(CloseTaskTestDriver), 1, 
"TestBreakTask", "local", testFolder);
+            ValidateSuccessForLocalRuntime(1, testFolder: testFolder);
+            
ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, 
testFolder, 1);
+            var messages = new List<string>();
+            messages.Add(DisposeMessageFromDriver);
+            messages.Add(BreakTaskMessage);
+            ValidateMessageSuccessfullyLogged(messages, "Node-*", 
EvaluatorStdout, testFolder, -1);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This test is to close a running task and enforce it to break and 
return after the current iteration
+        /// </summary>
+        [Fact]
+        public void TestEnforceCloseTaskOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfigurations(DisposeMessageFromDriver, 
GetTaskConfigurationForEnforceToCloseTask()), typeof(CloseTaskTestDriver), 1, 
"TestEnforceCloseTask", "local", testFolder);
+            ValidateSuccessForLocalRuntime(1, 1, 0, testFolder);
+            
ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, 
testFolder, 0);
+            var messages = new List<string>();
+            messages.Add(DisposeMessageFromDriver);
+            messages.Add(EnforceToCloseMessage);
+            ValidateMessageSuccessfullyLogged(messages, "Node-*", 
EvaluatorStdout, testFolder, -1);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
         /// This test is to close a running task with exception throw in close 
handler
         /// Expect to receive Exception in Failed Task event handler in driver
         /// </summary>
@@ -128,8 +167,31 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         {
             return TaskConfiguration.ConfigurationModule
                 .Set(TaskConfiguration.Identifier, "TaskID")
-                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.CloseTestTask>.Class)
-                .Set(TaskConfiguration.OnClose, 
GenericType<TestCloseTask.CloseTestTask>.Class)
+                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.CloseByReturnTestTask>.Class)
+                .Set(TaskConfiguration.OnClose, 
GenericType<TestCloseTask.CloseByReturnTestTask>.Class)
+                .Build();
+        }
+
+        private IConfiguration GetTaskConfigurationForBreakTask()
+        {
+            return TaskConfiguration.ConfigurationModule
+                .Set(TaskConfiguration.Identifier, "TaskID")
+                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class)
+                .Set(TaskConfiguration.OnClose, 
GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class)
+                .Build();
+        }
+        private IConfiguration GetTaskConfigurationForEnforceToCloseTask()
+        {
+            var taskConfig = TaskConfiguration.ConfigurationModule
+                .Set(TaskConfiguration.Identifier, "TaskID-EnforceToClose")
+                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class)
+                .Set(TaskConfiguration.OnClose, 
GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class)
+                .Build();
+
+            return TangFactory.GetTang()
+                .NewConfigurationBuilder(taskConfig)
+                .BindIntNamedParam<EnforceCloseTimeoutMilliseconds>("1000")
+                .BindNamedParameter<EnforceClose, 
bool>(GenericType<EnforceClose>.Class, "true")
                 .Build();
         }
 
@@ -137,8 +199,8 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         {
             return TaskConfiguration.ConfigurationModule
                 .Set(TaskConfiguration.Identifier, "TaskID-FailToClose")
-                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.FailToCloseTask>.Class)
-                .Set(TaskConfiguration.OnClose, 
GenericType<TestCloseTask.FailToCloseTask>.Class)
+                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.CloseByThrowExceptionTask>.Class)
+                .Set(TaskConfiguration.OnClose, 
GenericType<TestCloseTask.CloseByThrowExceptionTask>.Class)
                 .Build();
         }
 
@@ -146,7 +208,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         {
             return TaskConfiguration.ConfigurationModule
                 .Set(TaskConfiguration.Identifier, "TaskID-NoCloseHandler")
-                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.NoCloseHandlerTask>.Class)
+                .Set(TaskConfiguration.Task, 
GenericType<TestCloseTask.MissingCloseHandlerTask>.Class)
                 .Build();
         }
 
@@ -194,14 +256,14 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
         {
             private readonly IEvaluatorRequestor _requestor;
             private int _contextNumber = 0;
-            private string _disposeMessage;
-            private IConfiguration _taskConfiguration;
+            private readonly string _disposeMessage;
+            private readonly IConfiguration _taskConfiguration;
 
             [Inject]
             private CloseTaskTestDriver(IEvaluatorRequestor evaluatorRequestor,
                 [Parameter(typeof(DisposeMessage))] string disposeMessage,
                 [Parameter(typeof(TaskConfigurationString))] string 
taskConfigString,
-                AvroConfigurationSerializer avroConfigurationSerializer)
+                IConfigurationSerializer avroConfigurationSerializer)
             {
                 _requestor = evaluatorRequestor;
                 _disposeMessage = disposeMessage;
@@ -246,7 +308,11 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                 {
                     Assert.Contains(DefaultTaskCloseHandler.ExceptionMessage, 
failedExeption);
                 }
-                
+                if (value.Id.EndsWith("TaskID-EnforceToClose"))
+                {
+                    Assert.Contains(TaskManager.TaskKilledByDriver, 
failedExeption);
+                }
+
                 value.GetActiveContext().Value.Dispose();
             }
 
@@ -274,12 +340,15 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             }
         }
 
-        private sealed class CloseTestTask : ITask, IObserver<ICloseEvent>
+        /// <summary>
+        /// This test task receives close event, then signals Call() method to 
properly return.
+        /// </summary>
+        private sealed class CloseByReturnTestTask : ITask, 
IObserver<ICloseEvent>
         {
             private readonly CountdownEvent _suspendSignal = new 
CountdownEvent(1);
 
             [Inject]
-            private CloseTestTask()
+            private 
CloseByReturnTestTask([Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int 
enforceCloseTimeout)
             {
             }
 
@@ -322,12 +391,123 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             }
         }
 
-        private sealed class FailToCloseTask : ITask, IObserver<ICloseEvent>
+        /// <summary>
+        /// This is a testing task. It serves for two test cases.
+        /// In the first case, EnforceClose is false (default). When the task 
receives the close event, it signals the Call method
+        /// to let it continue the iteration. As _shouldCloseTask is set to 1, 
the Call() will return after
+        /// completing the current iteration.
+        /// In the second case, EnforceClose is set to true. When the task 
receives the close event, it sets
+        /// _shouldCloseTask to 1. As the task is hung in this scenario, 
Call() would never return.
+        ///  After waiting for _enforceCloseTimeoutMilliseconds, the close 
handler throws an exception, enforcing the task to stop.
+        /// </summary>
+        private sealed class CloseByBreakAndEnforceToStopTask : ITask, 
IObserver<ICloseEvent>
+        {
+            private long _shouldCloseTask = 0;
+            private long _isTaskStopped = 0;
+            private readonly bool _enforceClose;
+            private readonly int _enforceCloseTimeoutMilliseconds;
+
+            private readonly CountdownEvent _suspendSignal1 = new 
CountdownEvent(1);
+            private readonly CountdownEvent _suspendSignal2 = new 
CountdownEvent(1);
+            private readonly ManualResetEventSlim _waitToCloseEvent = new 
ManualResetEventSlim(false);
+
+            [Inject]
+            private CloseByBreakAndEnforceToStopTask(
+                [Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int 
enforceCloseTimeoutMilliseconds,
+                [Parameter(typeof(EnforceClose))] bool enforceClose)
+            {
+                _enforceClose = enforceClose;
+                _enforceCloseTimeoutMilliseconds = 
enforceCloseTimeoutMilliseconds;
+            }
+
+            public byte[] Call(byte[] memento)
+            {
+                int iterate = 1;
+
+                while (Interlocked.Read(ref _shouldCloseTask) == 0 && iterate 
< 100)
+                {
+                    iterate++;
+                    if (_enforceClose)
+                    {
+                        _suspendSignal1.Wait();
+                    }
+                    else
+                    {
+                        _suspendSignal2.Wait();
+                    }
+                }
+
+                Interlocked.Exchange(ref _isTaskStopped, 1);
+
+                if (Interlocked.Read(ref _shouldCloseTask) == 1)
+                {
+                    Logger.Log(Level.Info, BreakTaskMessage);
+                    _waitToCloseEvent.Set();
+                }
+
+                return null;
+            }
+
+            public void Dispose()
+            {
+                Logger.Log(Level.Info, "Task is disposed.");
+            }
+
+            /// <summary>
+            /// When the close event is received, it sets _shouldCloseTask to 
1.
+            /// If _enforceClose is false, _suspendSignal2 is signaled to let 
the task to continue to run. This is to simulate that the
+            /// task is running properly and will break after completing the 
current iteration. It will set the _waitToCloseEvent
+            /// to let the flow in the close event handler to continue.
+            /// If _enforceClose is true,  _suspendSignal1 will be not 
signaled, this is to simulate that the task is hung.
+            /// After waiting for specified time, the close handler will throw 
exception to enforce the task to stop.
+            /// </summary>
+            /// <param name="closeEvent"></param>
+            public void OnNext(ICloseEvent closeEvent)
+            {
+                if (closeEvent.Value.IsPresent() && 
Encoding.UTF8.GetString(closeEvent.Value.Value).Equals(DisposeMessageFromDriver))
+                {
+                    Logger.Log(Level.Info, "Closed event received in task:" + 
Encoding.UTF8.GetString(closeEvent.Value.Value));
+                    Interlocked.Exchange(ref _shouldCloseTask, 1);
+                    if (!_enforceClose)
+                    {
+                        _suspendSignal2.Signal();
+                    }
+
+                    
_waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds));
+
+                    if (Interlocked.Read(ref _isTaskStopped) == 0)
+                    {
+                        Logger.Log(Level.Info, EnforceToCloseMessage);
+                        throw new 
IMRUTaskSystemException(TaskManager.TaskKilledByDriver);
+                    }
+                }
+                else
+                {
+                    throw new Exception("Expected close event message is not 
received.");
+                }
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        /// <summary>
+        /// This is a test task for the scenario in which the task receives 
close event, instead of
+        /// let the task to return properly, it throws exception.
+        /// </summary>
+        private sealed class CloseByThrowExceptionTask : ITask, 
IObserver<ICloseEvent>
         {
             private readonly CountdownEvent _suspendSignal = new 
CountdownEvent(1);
 
             [Inject]
-            private FailToCloseTask()
+            private CloseByThrowExceptionTask()
             {
             }
 
@@ -370,10 +550,13 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
             }
         }
 
-        private sealed class NoCloseHandlerTask : ITask
+        /// <summary>
+        /// This task doesn't implement close handler. It is to test 
closeHandlerNoBound exception.
+        /// </summary>
+        private sealed class MissingCloseHandlerTask : ITask
         {
             [Inject]
-            private NoCloseHandlerTask()
+            private MissingCloseHandlerTask()
             {
             }
 
@@ -389,5 +572,10 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge
                 Logger.Log(Level.Info, "Task is disposed.");
             }
         }
+
+        [NamedParameter("Enforce the task to close", "EnforceClose", "false")]
+        private sealed class EnforceClose : Name<bool>
+        {
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 3840a38..932d3f2 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -183,7 +183,7 @@ under the License.
       <Project>{6dc3b04e-2b99-4fda-bd23-2c7864f4c477}</Project>
       <Name>Org.Apache.REEF.IMRU.Examples</Name>
     </ProjectReference>
-    <ProjectReference 
Include="..\Org.Apache.REEF.IMRU\Org.Apache.REEF.IMRU.csproj">
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.IMRU\Org.Apache.REEF.IMRU.csproj">
       <Project>{cc797c57-b465-4d11-98ac-edaaef5899a6}</Project>
       <Name>Org.Apache.REEF.IMRU</Name>
     </ProjectReference>

Reply via email to