Repository: reef
Updated Branches:
  refs/heads/master 68e3cb0f8 -> 6cde8ce78


[REEF-1423] Tasks are not disposed after they are closed

Updated TaskRunTime to ensure the Dispose is called after closing the task
Refactored IMRU task close handler
Added test cases.

JIRA:
  [REEF-1423](https://issues.apache.org/jira/browse/REEF-1423)

This closes #1032


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/6cde8ce7
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/6cde8ce7
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/6cde8ce7

Branch: refs/heads/master
Commit: 6cde8ce7808c3ca99b466117d83c7d8eb9606aed
Parents: 68e3cb0
Author: Julia Wang <[email protected]>
Authored: Wed Jun 8 19:42:30 2016 -0700
Committer: Mariia Mykhailova <[email protected]>
Committed: Fri Jun 17 17:52:50 2016 -0700

----------------------------------------------------------------------
 .../Runtime/Evaluator/Task/TaskRuntime.cs       |  18 ++
 lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs   |   6 +
 .../OnREEF/IMRUTasks/MapTaskHost.cs             |  73 ++---
 .../OnREEF/IMRUTasks/TaskCloseCoordinator.cs    | 115 +++++++
 .../OnREEF/IMRUTasks/UpdateTaskHost.cs          |  63 +---
 .../Org.Apache.REEF.IMRU.csproj                 |   1 +
 .../Functional/Bridge/TestDisposeTasks.cs       | 301 +++++++++++++++++++
 .../Functional/IMRU/IMRUCloseTaskTest.cs        |  30 +-
 .../Org.Apache.REEF.Tests.csproj                |   1 +
 9 files changed, 487 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index b14e3b9..7c94615 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -181,6 +181,24 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                 Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error 
during Close.", Logger);
                 _currentStatus.SetException(e);
             }
+            finally
+            {
+                try
+                {
+                    if (_userTask != null)
+                    {
+                        _userTask.Dispose();
+                    }
+                }
+                catch (Exception e)
+                {
+                    Utilities.Diagnostics.Exceptions.CaughtAndThrow(
+                        new InvalidOperationException("Cannot dispose task 
properly", e),
+                        Level.Error,
+                        "Exception during task dispose.",
+                        Logger);
+                }
+            }
         }
 
         public void Suspend(byte[] message)

http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs 
b/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs
index cb796eb..c9d65e2 100644
--- a/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs
@@ -19,6 +19,12 @@ using System;
 
 namespace Org.Apache.REEF.Common.Tasks
 {
+    /// <summary>
+    /// Task interface. Client should implement his own tasks. 
+    /// Each Task should implement Dispose() to release resources if any. 
+    /// Dispose will be called after the Call() is returned or task is closed. 
If there is any exception during the task dispose, 
+    /// the error will be logged and the exception will be ignored as the task 
has been completed. 
+    /// </summary>
     public interface ITask : IDisposable
     {
         byte[] Call(byte[] memento);

http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
index 6f42cd5..86b1f7c 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
@@ -48,48 +48,31 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private readonly bool _invokeGC;
 
         /// <summary>
-        /// When receiving a close event, this variable is set to 1. At the 
beginning of each task iteration,
-        /// if this variable is set to 1, the task will break from the loop 
and return from the Call() method.
-        /// </summary>
-        private long _shouldCloseTask = 0;
-
-        /// <summary>
-        /// Before the task is returned, this variable is set to 1.
-        /// Close handler will check this variable to decide if it needs to 
throw an exception.
-        /// </summary>
-        private long _isTaskStopped = 0;
-
-        /// <summary>
         /// Shows if the object has been disposed.
         /// </summary>
         private int _disposed = 0;
 
         /// <summary>
-        /// Waiting time for the task to close by itself
-        /// </summary>
-        private readonly int _enforceCloseTimeoutMilliseconds;
-
-        /// <summary>
-        /// An event that will wait in close handler until it is either 
signaled from Call method or timeout.
+        /// Group Communication client for the task
         /// </summary>
-        private readonly ManualResetEventSlim _waitToCloseEvent = new 
ManualResetEventSlim(false);
+        private readonly IGroupCommClient _groupCommunicationsClient;
 
         /// <summary>
-        /// Group Communication client for the task
+        /// Task close Coordinator to handle the work when receiving task 
close event
         /// </summary>
-        private readonly IGroupCommClient _groupCommunicationsClient;
+        private readonly TaskCloseCoordinator _taskCloseCoordinator;
 
         /// <summary>
         /// </summary>
         /// <param name="mapTask">The MapTask hosted in this REEF Task.</param>
         /// <param name="groupCommunicationsClient">Used to setup the 
communications.</param>
-        /// <param name="enforceCloseTimeoutMilliseconds">Timeout to enforce 
the task to close if receiving task close event</param>
+        /// <param name="taskCloseCoordinator">Task close Coordinator</param>
         /// <param name="invokeGC">Whether to call Garbage Collector after 
each iteration or not</param>
         [Inject]
         private MapTaskHost(
             IMapFunction<TMapInput, TMapOutput> mapTask,
             IGroupCommClient groupCommunicationsClient,
-            [Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int 
enforceCloseTimeoutMilliseconds,
+            TaskCloseCoordinator taskCloseCoordinator,
             [Parameter(typeof(InvokeGC))] bool invokeGC)
         {
             _mapTask = mapTask;
@@ -99,7 +82,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
                 
cg.GetBroadcastReceiver<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName);
             _dataReducer = 
cg.GetReduceSender<TMapOutput>(IMRUConstants.ReduceOperatorName);
             _invokeGC = invokeGC;
-            _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds;
+            _taskCloseCoordinator = taskCloseCoordinator;
         }
 
         /// <summary>
@@ -109,7 +92,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// <returns></returns>
         public byte[] Call(byte[] memento)
         {
-            while (Interlocked.Read(ref _shouldCloseTask) == 0)
+            MapControlMessage controlMessage = MapControlMessage.AnotherRound;
+
+            while (!_taskCloseCoordinator.ShouldCloseTask() && controlMessage 
!= MapControlMessage.Stop)
             {
                 if (_invokeGC)
                 {
@@ -118,56 +103,32 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
                     GC.WaitForPendingFinalizers();
                 }
 
-                TMapOutput result;
-
                 using (
                     MapInputWithControlMessage<TMapInput> mapInput = 
_dataAndMessageReceiver.Receive())
                 {
-                    if (mapInput.ControlMessage == MapControlMessage.Stop)
+                    controlMessage = mapInput.ControlMessage;
+                    if (controlMessage != MapControlMessage.Stop)
                     {
-                        break;
+                        _dataReducer.Send(_mapTask.Map(mapInput.Message));
                     }
-                    result = _mapTask.Map(mapInput.Message);
                 }
-             
-                _dataReducer.Send(result);
             }
 
-            Interlocked.Exchange(ref _isTaskStopped, 1);
-
-            if (Interlocked.Read(ref _shouldCloseTask) == 1)
-            {
-                _waitToCloseEvent.Set();
-            }
+            _taskCloseCoordinator.SignalTaskStopped();
             return null;
         }
 
         /// <summary>
-        /// Task close handler.
-        /// If the closed event is sent from driver, set _shouldCloseTask to 1 
so that to inform the Call() to stop at the end of the current iteration.
-        /// Then waiting for the signal from Call method. Either it is 
signaled or after _enforceCloseTimeoutMilliseconds,
-        /// checks if the task has been stopped. If not, throw 
IMRUTaskSystemException to enforce the task to stop.
+        /// Task close handler. Calls TaskCloseCoordinator to handle the event.
         /// </summary>
         /// <param name="closeEvent"></param>
         public void OnNext(ICloseEvent closeEvent)
         {
-            var msg = Encoding.UTF8.GetString(closeEvent.Value.Value);
-            if (closeEvent.Value.IsPresent() && 
msg.Equals(TaskManager.CloseTaskByDriver))
-            {
-                Logger.Log(Level.Info, "The task received close event with 
message: {0}.", msg);
-                Interlocked.Exchange(ref _shouldCloseTask, 1);
-
-                
_waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds));
-
-                if (Interlocked.Read(ref _isTaskStopped) == 0)
-                {
-                    throw new 
IMRUTaskSystemException(TaskManager.TaskKilledByDriver);
-                }
-            }
+            _taskCloseCoordinator.HandleEvent(closeEvent);
         }
 
         /// <summary>
-        /// Dispose function
+        /// Dispose function. Dispose IGroupCommunicationsClient.
         /// </summary>
         public void Dispose()
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
new file mode 100644
index 0000000..f60271a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Text;
+using System.Threading;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Attributes;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
+{
+    /// <summary>
+    /// This class provides a method to handle Task close event. It is called 
from TaskCloseEventHandler. 
+    /// It also wraps flags to represent if the task should be closed and if 
the task has been stopped
+    /// so that to provide a coordination between the task and the close 
handler.  
+    /// </summary>
+    [ThreadSafe]
+    internal sealed class TaskCloseCoordinator
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(TaskCloseCoordinator));
+
+        /// <summary>
+        /// When a close event is received, this variable is set to 1. At the 
beginning of each task iteration,
+        /// if this variable is set to 1, the task will break from the loop 
and return from the Call() method.
+        /// </summary>
+        private long _shouldCloseTask = 0;
+
+        /// <summary>
+        /// Waiting time for the task to close by itself
+        /// </summary>
+        private readonly int _enforceCloseTimeoutMilliseconds;
+
+        /// <summary>
+        /// An event that will wait in close handler until it is either 
signaled from Call method or timeout.
+        /// </summary>
+        private readonly ManualResetEventSlim _waitToCloseEvent = new 
ManualResetEventSlim(false);
+
+        /// <summary>
+        /// Handle task close event and manage the states, wait/signal when 
closing the task
+        /// </summary>
+        /// <param name="enforceCloseTimeoutMilliseconds">Timeout in 
milliseconds to enforce the task to close if receiving task close event</param>
+        [Inject]
+        private 
TaskCloseCoordinator([Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int 
enforceCloseTimeoutMilliseconds)
+        {
+            _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds;
+        }
+
+        /// <summary>
+        /// Handle Task close event.
+        /// Set _shouldCloseTask to 1 so that to inform the task to stop at 
the end of the current iteration.
+        /// Then waiting for the signal from Call method. Either it is 
signaled or after _enforceCloseTimeoutMilliseconds,
+        /// If the closed event is sent from driver, checks if the 
_waitToCloseEvent has been signaled. If not, throw 
+        /// IMRUTaskSystemException to enforce the task to stop.
+        /// </summary>
+        /// <param name="closeEvent"></param>
+        internal void HandleEvent(ICloseEvent closeEvent)
+        {
+            Interlocked.Exchange(ref _shouldCloseTask, 1);
+            var taskSignaled = 
_waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds));
+
+            if (closeEvent.Value.IsPresent())
+            {
+                var msg = Encoding.UTF8.GetString(closeEvent.Value.Value);
+                if (msg.Equals(TaskManager.CloseTaskByDriver))
+                {
+                    Logger.Log(Level.Info, "The task received close event with 
message: {0}.", msg);
+
+                    if (!taskSignaled)
+                    {
+                        throw new 
IMRUTaskSystemException(TaskManager.TaskKilledByDriver);
+                    }
+                }
+            }
+            else
+            {
+                Logger.Log(Level.Warning, "The task received close event with 
no message.");
+            }
+        }
+
+        /// <summary>
+        /// Indicates if the task should be stopped.
+        /// </summary>
+        /// <returns></returns>
+        internal bool ShouldCloseTask()
+        {
+            return Interlocked.Read(ref _shouldCloseTask) == 1;
+        }
+
+        /// <summary>
+        /// Called from Task right before the task is returned to signals 
_waitToCloseEvent. 
+        /// </summary>
+        internal void SignalTaskStopped()
+        {
+            _waitToCloseEvent.Set();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
index cfe121d..116bc63 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
@@ -50,50 +50,33 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private readonly IIMRUResultHandler<TResult> _resultHandler;
 
         /// <summary>
-        /// When receiving a close event, this variable is set to 1. At the 
beginning of each task iteration,
-        /// if this variable is set to 1, the task will break from the loop 
and return from the Call() method.
-        /// </summary>
-        private long _shouldCloseTask = 0;
-
-        /// <summary>
-        /// Before the task is returned, this variable is set to 1.
-        /// Close handler will check this variable to decide if it needs to 
throw an exception.
-        /// </summary>
-        private long _isTaskStopped = 0;
-
-        /// <summary>
         /// Shows if the object has been disposed.
         /// </summary>
         private int _disposed = 0;
 
         /// <summary>
-        /// Waiting time for the task to close by itself
-        /// </summary>
-        private readonly int _enforceCloseTimeoutMilliseconds;
-
-        /// <summary>
-        /// An event that will wait in close handler until it is either 
signaled from Call method or timeout.
+        /// Group Communication client for the task
         /// </summary>
-        private readonly ManualResetEventSlim _waitToCloseEvent = new 
ManualResetEventSlim(false);
+        private readonly IGroupCommClient _groupCommunicationsClient;
 
         /// <summary>
-        /// Group Communication client for the task
+        /// Task close Coordinator to handle the work when receiving task 
close event
         /// </summary>
-        private readonly IGroupCommClient _groupCommunicationsClient;
+        private readonly TaskCloseCoordinator _taskCloseCoordinator;
 
         /// <summary>
         /// </summary>
         /// <param name="updateTask">The UpdateTask hosted in this REEF 
Task.</param>
         /// <param name="groupCommunicationsClient">Used to setup the 
communications.</param>
         /// <param name="resultHandler">Result handler</param>
-        /// <param name="enforceCloseTimeoutMilliseconds">Timeout in 
milliseconds to enforce the task to close if receiving task close event</param>
+        /// <param name="taskCloseCoordinator">Task close Coordinator</param>
         /// <param name="invokeGC">Whether to call Garbage Collector after 
each iteration or not</param>
         [Inject]
         private UpdateTaskHost(
             IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask,
             IGroupCommClient groupCommunicationsClient,
             IIMRUResultHandler<TResult> resultHandler,
-            [Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int 
enforceCloseTimeoutMilliseconds,
+            TaskCloseCoordinator taskCloseCoordinator,
             [Parameter(typeof(InvokeGC))] bool invokeGC)
         {
             _updateTask = updateTask;
@@ -104,7 +87,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             _dataReceiver = 
cg.GetReduceReceiver<TMapOutput>(IMRUConstants.ReduceOperatorName);
             _invokeGC = invokeGC;
             _resultHandler = resultHandler;
-            _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds;
+            _taskCloseCoordinator = taskCloseCoordinator;
         }
 
         /// <summary>
@@ -117,7 +100,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             var updateResult = _updateTask.Initialize();
             int iterNo = 0;
 
-            while (updateResult.HasMapInput && Interlocked.Read(ref 
_shouldCloseTask) == 0)
+            while (updateResult.HasMapInput && 
!_taskCloseCoordinator.ShouldCloseTask())
             {
                 iterNo++;
 
@@ -145,7 +128,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
                 }
             }
 
-            if (Interlocked.Read(ref _shouldCloseTask) == 0)
+            if (!_taskCloseCoordinator.ShouldCloseTask())
             {
                 MapInputWithControlMessage<TMapInput> stopMessage =
                     new 
MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop);
@@ -153,41 +136,21 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             }
 
             _resultHandler.Dispose();
-            Interlocked.Exchange(ref _isTaskStopped, 1);
-
-            if (Interlocked.Read(ref _shouldCloseTask) == 1)
-            {
-                _waitToCloseEvent.Set();
-            }
+            _taskCloseCoordinator.SignalTaskStopped();
             return null;
         }
 
         /// <summary>
-        /// Task close handler.
-        /// If the closed event is sent from driver, set _shouldCloseTask to 1 
so that to inform the Call() to stop at the end of the current iteration.
-        /// Then waiting for the signal from Call method. Either it is 
signaled or after _enforceCloseTimeoutMilliseconds,
-        /// checks if the task has been stopped. If not, throw 
IMRUTaskSystemException to enforce the task to stop.
+        /// Task close handler. Call TaskCloseCoordinator to handle the event.
         /// </summary>
         /// <param name="closeEvent"></param>
         public void OnNext(ICloseEvent closeEvent)
         {
-            var msg = Encoding.UTF8.GetString(closeEvent.Value.Value);
-            if (closeEvent.Value.IsPresent() && 
msg.Equals(TaskManager.CloseTaskByDriver))
-            {
-                Logger.Log(Level.Info, "The task received close event with 
message: {0}.", msg);
-                Interlocked.Exchange(ref _shouldCloseTask, 1);
-
-                
_waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds));
-
-                if (Interlocked.Read(ref _isTaskStopped) == 0)
-                {
-                    throw new 
IMRUTaskSystemException(TaskManager.TaskKilledByDriver);
-                }
-            }
+            _taskCloseCoordinator.HandleEvent(closeEvent);
         }
 
         /// <summary>
-        /// Dispose function
+        /// Dispose function. Dispose IGroupCommunicationsClient.
         /// </summary>
         public void Dispose()
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 3f62b1f..cdf87cc 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -94,6 +94,7 @@ under the License.
     <Compile Include="OnREEF\IMRUTasks\IMRUTaskGroupCommunicationException.cs" 
/>
     <Compile Include="OnREEF\IMRUTasks\IMRUTaskSystemException.cs" />
     <Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" />
+    <Compile Include="OnREEF\IMRUTasks\TaskCloseCoordinator.cs" />
     <Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" />
     <Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs" 
/>
     <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs
new file mode 100644
index 0000000..6a5eb6a
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs
@@ -0,0 +1,301 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Runtime.Evaluator.Task;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Bridge
+{
+    public class TestDisposeTasks : ReefFunctionalTest
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(TestDisposeTasks));
+
+        private const string ExitByReturn = "ExitByReturn";
+        private const string ExitByException = "ExitByException";
+        private const string TaskIsDisposed = "TaskIsDisposed";
+        private const string TaskKilledByDriver = "TaskKilledByDriver";
+        private const string TaskId = "TaskId";
+        private const string ContextId = "ContextId";
+
+        /// <summary>
+        /// Test scenario: Task returned properly then disposed
+        /// </summary>
+        [Fact]
+        public void TestDisposeInTaskNormalReturnOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestRun(DriverConfigurations(1), typeof(TestDisposeTasks), 1, 
"TestDisposeTasks", "local", testFolder);
+            ValidateSuccessForLocalRuntime(1, 0, 0, testFolder);
+            var messages = new List<string> { TaskIsDisposed };
+            ValidateMessageSuccessfullyLogged(messages, "Node-*", 
EvaluatorStdout, testFolder, 1);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// Test scenario: Task is enforced to close after receiving close 
event
+        /// </summary>
+        [Fact]
+        public void TestDisposeInTaskExceptionOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestRun(DriverConfigurations(2), typeof(TestDisposeTasks), 1, 
"TestDisposeTasks", "local", testFolder);
+            ValidateSuccessForLocalRuntime(1, 1, 0, testFolder);
+            var messages = new List<string> { TaskIsDisposed };
+            ValidateMessageSuccessfullyLogged(messages, "Node-*", 
EvaluatorStdout, testFolder, 1);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// Test scenario: Dispose context while the task is still running.
+        /// </summary>
+        [Fact]
+        public void TestDisposeFromContextInRunningOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestRun(DriverConfigurations(3), typeof(TestDisposeTasks), 1, 
"TestDisposeTasks", "local", testFolder);
+            ValidateSuccessForLocalRuntime(1, 0, 0, testFolder);
+            var messages = new List<string>();
+            messages.Add(TaskIsDisposed);
+            ValidateMessageSuccessfullyLogged(messages, "Node-*", 
EvaluatorStdout, testFolder, 1);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// Driver configuration for the test driver
+        /// </summary>
+        /// <returns></returns>
+        public IConfiguration DriverConfigurations(int taskNumber)
+        {
+            var taskIdConfig = TangFactory.GetTang()
+                .NewConfigurationBuilder()
+                .BindStringNamedParam<TaskNumber>(taskNumber.ToString())
+                .Build();
+
+            var driverConfig = DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, 
GenericType<DisposeTaskTestDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<DisposeTaskTestDriver>.Class)
+                .Set(DriverConfiguration.OnContextActive, 
GenericType<DisposeTaskTestDriver>.Class)
+                .Set(DriverConfiguration.OnTaskRunning, 
GenericType<DisposeTaskTestDriver>.Class)
+                .Set(DriverConfiguration.OnTaskCompleted, 
GenericType<DisposeTaskTestDriver>.Class)
+                .Set(DriverConfiguration.OnTaskFailed, 
GenericType<DisposeTaskTestDriver>.Class)
+                .Build();
+
+            return Configurations.Merge(taskIdConfig, driverConfig);
+        }
+
+        /// <summary>
+        /// Test driver
+        /// </summary>
+        private sealed class DisposeTaskTestDriver :
+            IObserver<IDriverStarted>,
+            IObserver<IAllocatedEvaluator>,
+            IObserver<IActiveContext>,
+            IObserver<ICompletedTask>,
+            IObserver<IFailedTask>,
+            IObserver<IRunningTask>            
+        {
+            private readonly IEvaluatorRequestor _requestor;            
+            private readonly string _taskNumber;
+
+            [Inject]
+            private DisposeTaskTestDriver(IEvaluatorRequestor 
evaluatorRequestor,
+                [Parameter(typeof(TaskNumber))] string taskNumber)
+            {
+                _requestor = evaluatorRequestor;
+                _taskNumber = taskNumber;
+            }
+
+            public void OnNext(IDriverStarted value)
+            {
+                
_requestor.Submit(_requestor.NewBuilder().SetNumber(1).Build());
+            }
+
+            public void OnNext(IAllocatedEvaluator value)
+            {
+                value.SubmitContext(
+                    ContextConfiguration.ConfigurationModule
+                        .Set(ContextConfiguration.Identifier, ContextId)
+                        .Build());
+            }
+
+            public void OnNext(IActiveContext value)
+            {
+                value.SubmitTask(GetTaskConfigurationForCloseTask(TaskId + 
_taskNumber));
+            }
+
+            public void OnNext(ICompletedTask value)
+            {
+                Logger.Log(Level.Info, "Task completed: " + value.Id);
+                Assert.Equal(TaskId + "1", value.Id);
+                value.ActiveContext.Dispose();
+            }
+
+            /// <summary>
+            /// Verify when exception is shown in task, IFailedTask will be 
received here with the message set in the task
+            /// And verify the context associated with the failed task is the 
same as the context that the task was submitted
+            /// </summary>
+            /// <param name="value"></param>
+            public void OnNext(IFailedTask value)
+            {
+                Assert.Equal(TaskId + "2", value.Id);
+
+                var failedException = 
ByteUtilities.ByteArraysToString(value.Data.Value);
+                var e = value.AsError();
+                Logger.Log(Level.Error, "In IFailedTask: e.type: {0}, 
e.message: {1}.", e.GetType(), e.Message);
+                Logger.Log(Level.Error, "In IFailedTask: value.Data.Value: 
{0}, value.Message {1}.", failedException, value.Message);
+
+                Assert.Equal(typeof(Exception), e.GetType());
+                Assert.Equal(TaskKilledByDriver, e.Message);
+                Assert.Contains(TaskKilledByDriver, failedException);
+
+                value.GetActiveContext().Value.Dispose();
+            }
+
+            /// <summary>
+            /// Task1: Close task and expect it to return from Call()
+            /// Task2: Close the task and expect it throw exception
+            /// Task3: Let context Dispose to close a running task and make 
sure the task is disposed
+            /// </summary>
+            /// <param name="value"></param>
+            public void OnNext(IRunningTask value)
+            {
+                Logger.Log(Level.Info, "Task running: " + value.Id);
+                switch (value.Id)
+                {
+                    case TaskId + "1":
+                        value.Dispose(Encoding.UTF8.GetBytes(ExitByReturn));
+                        break;
+                    case TaskId + "2":
+                        value.Dispose(Encoding.UTF8.GetBytes(ExitByException));
+                        break;
+                    case TaskId + "3":
+                        value.ActiveContext.Dispose();
+                        break;
+                }
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            private static IConfiguration 
GetTaskConfigurationForCloseTask(string taskId)
+            {
+                return TaskConfiguration.ConfigurationModule
+                    .Set(TaskConfiguration.Identifier, taskId)
+                    .Set(TaskConfiguration.Task, 
GenericType<CloseAndDisposeTask>.Class)
+                    .Set(TaskConfiguration.OnClose, 
GenericType<CloseAndDisposeTask>.Class)
+                    .Build();
+            }
+        }
+
+        private sealed class CloseAndDisposeTask : ITask, 
IObserver<ICloseEvent>
+        {
+            private readonly CountdownEvent _suspendSignal = new 
CountdownEvent(1);
+            private int _disposed = 0;
+
+            [Inject]
+            private CloseAndDisposeTask()
+            {
+            }
+
+            public byte[] Call(byte[] memento)
+            {
+                Logger.Log(Level.Info, "Hello in CloseAndDisposeTask");
+                _suspendSignal.Wait();
+                return null;
+            }
+
+            public void Dispose()
+            {
+                if (Interlocked.Exchange(ref _disposed, 1) == 0)
+                {
+                    Logger.Log(Level.Info, TaskIsDisposed);
+                }
+            }
+
+            /// <summary>
+            /// Case 1: if message is ExitByReturn, signal the Call() to make 
the task return.  
+            /// Case 2: if message is ExitByException, throw exception to 
expect the driver to receive FailedTask. 
+            /// Otherwise do nothing, expecting TaskRuntime to dispose the 
task. 
+            /// </summary>
+            /// <param name="closeEvent"></param>
+            public void OnNext(ICloseEvent closeEvent)
+            {
+                if (closeEvent.Value != null)
+                {
+                    if (closeEvent.Value.IsPresent())
+                    {
+                        string msg = 
Encoding.UTF8.GetString(closeEvent.Value.Value);
+                        Logger.Log(Level.Info, "Closed event received in 
task:" + msg);
+
+                        if (msg.Equals(ExitByReturn))
+                        {
+                            _suspendSignal.Signal();
+                        }
+                        else if (msg.Equals(ExitByException))
+                        {
+                            throw new Exception(TaskKilledByDriver);
+                        }
+                    }
+                    else
+                    {
+                        Logger.Log(Level.Info, "Closed event received in task 
with no message");
+                    }
+                }
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        [NamedParameter]
+        private class TaskNumber : Name<string>
+        {            
+        }        
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
index 2c766f2..c3521cd 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
@@ -52,17 +52,17 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         [Fact]
         public void TestTaskCloseOnLocalRuntime()
         {
-            int chunkSize = 2;
-            int dims = 50;
-            int iterations = 200;
-            int mapperMemory = 5120;
-            int updateTaskMemory = 5120;
-            int numTasks = 4;
-            string testFolder = DefaultRuntimeFolder + TestId;
+            const int chunkSize = 2;
+            const int dims = 50;
+            const int iterations = 200;
+            const int mapperMemory = 5120;
+            const int updateTaskMemory = 5120;
+            const int numTasks = 4;
+            var testFolder = DefaultRuntimeFolder + TestId;
             TestBroadCastAndReduce(false, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory, testFolder);
             string[] lines = ReadLogFile(DriverStdout, "driver", testFolder);
-            int failedCount = GetMessageCount(lines, FailTaskMessage);
-            int completedCount = GetMessageCount(lines, CompletedTaskMessage);
+            var failedCount = GetMessageCount(lines, FailTaskMessage);
+            var completedCount = GetMessageCount(lines, CompletedTaskMessage);
             Assert.Equal(numTasks, failedCount + completedCount);
             CleanUp(testFolder);
         }
@@ -80,12 +80,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         [Fact(Skip = "Requires Yarn")]
         public void TestTaskCloseOnLocalRuntimeOnYarn()
         {
-            int chunkSize = 2;
-            int dims = 50;
-            int iterations = 200;
-            int mapperMemory = 5120;
-            int updateTaskMemory = 5120;
-            int numTasks = 4;
+            const int chunkSize = 2;
+            const int dims = 50;
+            const int iterations = 200;
+            const int mapperMemory = 5120;
+            const int updateTaskMemory = 5120;
+            const int numTasks = 4;
             TestBroadCastAndReduce(true, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 2e83ab6..e58f980 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -129,6 +129,7 @@ under the License.
     <Compile Include="Functional\ReefFunctionalTest.cs" />
     <Compile Include="Functional\RuntimeName\RuntimeNameTask.cs" />
     <Compile Include="Functional\RuntimeName\RuntimeNameTest.cs" />
+    <Compile Include="Functional\Bridge\TestDisposeTasks.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="Utility\TestDriverConfigGenerator.cs" />
     <Compile Include="Utility\TestExceptions.cs" />

Reply via email to