Repository: reef
Updated Branches:
  refs/heads/master af8b9086e -> 6b6752426


[REEF-1420] Dispose IGroupCommClient/Network Service from IMRU tasks

JIRA: [REEF-1420](https://issues.apache.org/jira/browse/REEF-1420)
This closes #1025


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/6b675242
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/6b675242
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/6b675242

Branch: refs/heads/master
Commit: 6b675242616c6fc60d65d61eeadc831eef349886
Parents: af8b908
Author: Julia Wang <[email protected]>
Authored: Sat Jun 4 12:29:58 2016 -0700
Committer: Andrew Chung <[email protected]>
Committed: Mon Jun 6 14:21:32 2016 -0700

----------------------------------------------------------------------
 .../OnREEF/IMRUTasks/MapTaskHost.cs                  | 15 +++++++++++++++
 .../OnREEF/IMRUTasks/UpdateTaskHost.cs               | 15 +++++++++++++++
 2 files changed, 30 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/6b675242/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
index 5f9823a..6f42cd5 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
@@ -60,6 +60,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private long _isTaskStopped = 0;
 
         /// <summary>
+        /// Shows if the object has been disposed.
+        /// </summary>
+        private int _disposed = 0;
+
+        /// <summary>
         /// Waiting time for the task to close by itself
         /// </summary>
         private readonly int _enforceCloseTimeoutMilliseconds;
@@ -70,6 +75,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private readonly ManualResetEventSlim _waitToCloseEvent = new 
ManualResetEventSlim(false);
 
         /// <summary>
+        /// Group Communication client for the task
+        /// </summary>
+        private readonly IGroupCommClient _groupCommunicationsClient;
+
+        /// <summary>
         /// </summary>
         /// <param name="mapTask">The MapTask hosted in this REEF Task.</param>
         /// <param name="groupCommunicationsClient">Used to setup the 
communications.</param>
@@ -83,6 +93,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             [Parameter(typeof(InvokeGC))] bool invokeGC)
         {
             _mapTask = mapTask;
+            _groupCommunicationsClient = groupCommunicationsClient;
             var cg = 
groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);
             _dataAndMessageReceiver =
                 
cg.GetBroadcastReceiver<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName);
@@ -160,6 +171,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// </summary>
         public void Dispose()
         {
+            if (Interlocked.Exchange(ref _disposed, 1) == 0)
+            {
+                _groupCommunicationsClient.Dispose();
+            }
         }
 
         public void OnError(Exception error)

http://git-wip-us.apache.org/repos/asf/reef/blob/6b675242/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
index 4f9ad9b..cfe121d 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
@@ -62,6 +62,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private long _isTaskStopped = 0;
 
         /// <summary>
+        /// Shows if the object has been disposed.
+        /// </summary>
+        private int _disposed = 0;
+
+        /// <summary>
         /// Waiting time for the task to close by itself
         /// </summary>
         private readonly int _enforceCloseTimeoutMilliseconds;
@@ -72,6 +77,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private readonly ManualResetEventSlim _waitToCloseEvent = new 
ManualResetEventSlim(false);
 
         /// <summary>
+        /// Group Communication client for the task
+        /// </summary>
+        private readonly IGroupCommClient _groupCommunicationsClient;
+
+        /// <summary>
         /// </summary>
         /// <param name="updateTask">The UpdateTask hosted in this REEF 
Task.</param>
         /// <param name="groupCommunicationsClient">Used to setup the 
communications.</param>
@@ -87,6 +97,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             [Parameter(typeof(InvokeGC))] bool invokeGC)
         {
             _updateTask = updateTask;
+            _groupCommunicationsClient = groupCommunicationsClient;
             var cg = 
groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);
             _dataAndControlMessageSender =
                 
cg.GetBroadcastSender<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName);
@@ -180,6 +191,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// </summary>
         public void Dispose()
         {
+            if (Interlocked.Exchange(ref _disposed, 1) == 0)
+            {
+                _groupCommunicationsClient.Dispose();
+            }
         }
 
         public void OnError(Exception error)

Reply via email to