Repository: reef Updated Branches: refs/heads/master af8b9086e -> 6b6752426
[REEF-1420] Dispose IGroupCommClient/Network Service from IMRU tasks JIRA: [REEF-1420](https://issues.apache.org/jira/browse/REEF-1420) This closes #1025 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/6b675242 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/6b675242 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/6b675242 Branch: refs/heads/master Commit: 6b675242616c6fc60d65d61eeadc831eef349886 Parents: af8b908 Author: Julia Wang <[email protected]> Authored: Sat Jun 4 12:29:58 2016 -0700 Committer: Andrew Chung <[email protected]> Committed: Mon Jun 6 14:21:32 2016 -0700 ---------------------------------------------------------------------- .../OnREEF/IMRUTasks/MapTaskHost.cs | 15 +++++++++++++++ .../OnREEF/IMRUTasks/UpdateTaskHost.cs | 15 +++++++++++++++ 2 files changed, 30 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/6b675242/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs index 5f9823a..6f42cd5 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs @@ -60,6 +60,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks private long _isTaskStopped = 0; /// <summary> + /// Shows if the object has been disposed. + /// </summary> + private int _disposed = 0; + + /// <summary> /// Waiting time for the task to close by itself /// </summary> private readonly int _enforceCloseTimeoutMilliseconds; @@ -70,6 +75,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks private readonly ManualResetEventSlim _waitToCloseEvent = new ManualResetEventSlim(false); /// <summary> + /// Group Communication client for the task + /// </summary> + private readonly IGroupCommClient _groupCommunicationsClient; + + /// <summary> /// </summary> /// <param name="mapTask">The MapTask hosted in this REEF Task.</param> /// <param name="groupCommunicationsClient">Used to setup the communications.</param> @@ -83,6 +93,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks [Parameter(typeof(InvokeGC))] bool invokeGC) { _mapTask = mapTask; + _groupCommunicationsClient = groupCommunicationsClient; var cg = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName); _dataAndMessageReceiver = cg.GetBroadcastReceiver<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName); @@ -160,6 +171,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// </summary> public void Dispose() { + if (Interlocked.Exchange(ref _disposed, 1) == 0) + { + _groupCommunicationsClient.Dispose(); + } } public void OnError(Exception error) http://git-wip-us.apache.org/repos/asf/reef/blob/6b675242/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs index 4f9ad9b..cfe121d 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs @@ -62,6 +62,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks private long _isTaskStopped = 0; /// <summary> + /// Shows if the object has been disposed. + /// </summary> + private int _disposed = 0; + + /// <summary> /// Waiting time for the task to close by itself /// </summary> private readonly int _enforceCloseTimeoutMilliseconds; @@ -72,6 +77,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks private readonly ManualResetEventSlim _waitToCloseEvent = new ManualResetEventSlim(false); /// <summary> + /// Group Communication client for the task + /// </summary> + private readonly IGroupCommClient _groupCommunicationsClient; + + /// <summary> /// </summary> /// <param name="updateTask">The UpdateTask hosted in this REEF Task.</param> /// <param name="groupCommunicationsClient">Used to setup the communications.</param> @@ -87,6 +97,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks [Parameter(typeof(InvokeGC))] bool invokeGC) { _updateTask = updateTask; + _groupCommunicationsClient = groupCommunicationsClient; var cg = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName); _dataAndControlMessageSender = cg.GetBroadcastSender<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName); @@ -180,6 +191,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// </summary> public void Dispose() { + if (Interlocked.Exchange(ref _disposed, 1) == 0) + { + _groupCommunicationsClient.Dispose(); + } } public void OnError(Exception error)
