Repository: reef
Updated Branches:
  refs/heads/master 1c7adac04 -> 1b83ad213


[REEF-1549] Resolve the issue in WaitingForRegistration

* Move WaitingForRegistration from constructor to Call method in tasks
* Add Cancellation token to WaitingForRegistration method
* Enable test case TestFailMapperEvaluatorsOnInit.TestFailedMapperOnLocalRuntime
  which tests the cancelation scenario in WaitingForRegistration

JIRA:
  [REEF-1549](https://issues.apache.org/jira/browse/REEF-1549)

This closes #1117


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/1b83ad21
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/1b83ad21
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/1b83ad21

Branch: refs/heads/master
Commit: 1b83ad21346ffb028dd8271ef4ac7b2f5af19b1f
Parents: 1c7adac
Author: Julia Wang <[email protected]>
Authored: Fri Sep 9 18:07:18 2016 -0700
Committer: Mariia Mykhailova <[email protected]>
Committed: Tue Sep 13 20:02:22 2016 -0700

----------------------------------------------------------------------
 .../MachineLearning/KMeans/KMeansMasterTask.cs  | 16 ++++---
 .../MachineLearning/KMeans/KMeansSlaveTask.cs   |  1 +
 .../OnREEF/IMRUTasks/TaskHostBase.cs            |  1 +
 .../BroadcastReduceDriverAndTasks/MasterTask.cs |  1 +
 .../BroadcastReduceDriverAndTasks/SlaveTask.cs  |  1 +
 .../PipelinedMasterTask.cs                      |  1 +
 .../PipelinedSlaveTask.cs                       |  2 +
 .../ScatterReduceDriverAndTasks/MasterTask.cs   |  1 +
 .../ScatterReduceDriverAndTasks/SlaveTask.cs    |  1 +
 .../Operators/IGroupCommOperatorInternal.cs     |  4 +-
 .../Group/Operators/Impl/BroadcastReceiver.cs   |  4 +-
 .../Group/Operators/Impl/BroadcastSender.cs     |  5 ++-
 .../Group/Operators/Impl/ReduceReceiver.cs      |  4 +-
 .../Group/Operators/Impl/ReduceSender.cs        |  4 +-
 .../Group/Operators/Impl/ScatterReceiver.cs     |  4 +-
 .../Group/Operators/Impl/ScatterSender.cs       |  5 ++-
 .../Task/ICommunicationGroupClientInternal.cs   |  4 +-
 .../Group/Task/IGroupCommClient.cs              |  7 +++
 .../Group/Task/IOperatorTopology.cs             |  2 +-
 .../Group/Task/Impl/CommunicationGroupClient.cs |  5 ++-
 .../Group/Task/Impl/GroupCommClient.cs          | 31 +++++++++----
 .../Group/Task/Impl/OperatorTopology.cs         | 47 ++++++++++----------
 .../NetworkService/NetworkService.cs            | 15 +++++--
 .../IMRU/TestFailMapperEvaluatorsOnInit.cs      | 11 +++--
 24 files changed, 114 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
index 140897b..e85ecd6 100644
--- 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
+++ 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs
@@ -33,15 +33,15 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(KMeansMasterTask));
 
-        private int _iteration = 0;
+        private int _iteration;
 
-        private readonly ICommunicationGroupClient _commGroup;
         private readonly IBroadcastSender<Centroids> _dataBroadcastSender;
         private readonly IBroadcastSender<ControlMessage> 
_controlBroadcastSender;
         private readonly IReduceReceiver<ProcessedResults> 
_meansReducerReceiver;
         private readonly string _kMeansExecutionDirectory;
         private Centroids _centroids;
         private bool _isInitialIteration;
+        private readonly IGroupCommClient _groupCommClient;
 
         [Inject]
         public KMeansMasterTask(
@@ -55,10 +55,11 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
                 {
                     throw new ArgumentException("There must be more than 1 
Evaluators in total, but the total evaluators number provided is " + 
totalNumEvaluators);
                 }
-                _commGroup = 
groupCommClient.GetCommunicationGroup(Constants.KMeansCommunicationGroupName);
-                _dataBroadcastSender = 
_commGroup.GetBroadcastSender<Centroids>(Constants.CentroidsBroadcastOperatorName);
-                _meansReducerReceiver = 
_commGroup.GetReduceReceiver<ProcessedResults>(Constants.MeansReduceOperatorName);
-                _controlBroadcastSender = 
_commGroup.GetBroadcastSender<ControlMessage>(Constants.ControlMessageBroadcastOperatorName);
+                _groupCommClient = groupCommClient;
+                var commGroup = 
_groupCommClient.GetCommunicationGroup(Constants.KMeansCommunicationGroupName);
+                _dataBroadcastSender = 
commGroup.GetBroadcastSender<Centroids>(Constants.CentroidsBroadcastOperatorName);
+                _meansReducerReceiver = 
commGroup.GetReduceReceiver<ProcessedResults>(Constants.MeansReduceOperatorName);
+                _controlBroadcastSender = 
commGroup.GetBroadcastSender<ControlMessage>(Constants.ControlMessageBroadcastOperatorName);
                 _kMeansExecutionDirectory = executionDirectory;
                 _isInitialIteration = true;
             }
@@ -66,7 +67,8 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
 
         public byte[] Call(byte[] memento)
         {
-            // TODO: this belongs to dedicated dataloader layer, will refactor 
once we have that
+            // TODO: this belongs to dedicated data loader layer, will 
refactor once we have that
+            _groupCommClient.Initialize();
             string centroidFile = Path.Combine(_kMeansExecutionDirectory, 
Constants.CentroidsFile);
             _centroids = new 
Centroids(DataPartitionCache.ReadDataFile(centroidFile));
 

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs
index db66ed1..9887459 100644
--- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs
@@ -57,6 +57,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
 
         public byte[] Call(byte[] memento)
         {
+            _groupCommClient.Initialize();
             while (true)
             {
                 if (_controlBroadcastReceiver.Receive() == ControlMessage.STOP)

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs
index d025528..c14ff73 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs
@@ -90,6 +90,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             Logger.Log(Level.Info, "Entering {0} Call().", TaskHostName);
             try
             {
+                _groupCommunicationsClient.Initialize(_cancellationSource);
                 return TaskBody(memento);
             }
             catch (Exception e)

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
index 8419018..ade8df0 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
@@ -56,6 +56,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
 
         public byte[] Call(byte[] memento)
         {
+            _groupCommClient.Initialize();
             Stopwatch broadcastTime = new Stopwatch();
             Stopwatch reduceTime = new Stopwatch();
 

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
index cf1a4ec..d9f033f 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
@@ -51,6 +51,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
 
         public byte[] Call(byte[] memento)
         {
+            _groupCommClient.Initialize();
             Stopwatch broadcastTime = new Stopwatch();
             Stopwatch reduceTime = new Stopwatch();
 

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
index 984f67f..2c1ca59 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
@@ -60,6 +60,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
 
         public byte[] Call(byte[] memento)
         {
+            _groupCommClient.Initialize();
             int[] intArr = new int[_arraySize];
 
             for (int j = 0; j < _arraySize; j++)

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
index 67c9db4..6cc008c 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
@@ -54,6 +54,8 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
 
         public byte[] Call(byte[] memento)
         {
+            _groupCommClient.Initialize();
+
             int[] resArr = new int[_arraySize];
 
             for (int j = 0; j < resArr.Length; j++)

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs
index 644c597..840eb5d 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs
@@ -47,6 +47,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive
 
         public byte[] Call(byte[] memento)
         {
+            _groupCommClient.Initialize();
             List<int> data = Enumerable.Range(1, 100).ToList();
             _scatterSender.Send(data);
 

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs
index 748d685..c2ae829 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs
@@ -47,6 +47,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive
 
         public byte[] Call(byte[] memento)
         {
+            _groupCommClient.Initialize();
             List<int> data = _scatterReceiver.Receive();
             Logger.Log(Level.Info, "Received data: {0}", string.Join(" ", 
data));
 

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs
index 5267a2c..bcb8e44 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Threading;
+
 namespace Org.Apache.REEF.Network.Group.Operators
 {
     internal interface IGroupCommOperatorInternal
@@ -22,6 +24,6 @@ namespace Org.Apache.REEF.Network.Group.Operators
         /// <summary>
         /// Ensure all parent and children nodes in the topology are 
registered with teh Name Service.
         /// </summary>
-        void WaitForRegistration();
+        void WaitForRegistration(CancellationTokenSource cancellationSource);
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
index cad774d..f7f9e8f 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
@@ -114,11 +114,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// Ensure all parent and children nodes in the topology are 
registered with teh Name Service.
         /// </summary>
-        void IGroupCommOperatorInternal.WaitForRegistration()
+        void 
IGroupCommOperatorInternal.WaitForRegistration(CancellationTokenSource 
cancellationSource)
         {
             if (_initialize)
             {
-                _topology.Initialize();
+                _topology.Initialize(cancellationSource);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
index aa75c6e..59f8df9 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
@@ -108,11 +109,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// Ensure all parent and children nodes in the topology are 
registered with teh Name Service.
         /// </summary>
-        void IGroupCommOperatorInternal.WaitForRegistration()
+        void 
IGroupCommOperatorInternal.WaitForRegistration(CancellationTokenSource 
cancellationSource)
         {
             if (_initialize)
             {
-                _topology.Initialize();
+                _topology.Initialize(cancellationSource);
             }
         }      
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
index fb129da..085e1db 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs
@@ -120,11 +120,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// Ensure all parent and children nodes in the topology are 
registered with teh Name Service.
         /// </summary>
-        void IGroupCommOperatorInternal.WaitForRegistration()
+        void 
IGroupCommOperatorInternal.WaitForRegistration(CancellationTokenSource 
cancellationSource)
         {
             if (_initialize)
             {
-                _topology.Initialize();
+                _topology.Initialize(cancellationSource);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
index 76202b2..9977d5f 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
@@ -140,11 +140,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// Ensure all parent and children nodes in the topology are 
registered with teh Name Service.
         /// </summary>
-        void IGroupCommOperatorInternal.WaitForRegistration()
+        void 
IGroupCommOperatorInternal.WaitForRegistration(CancellationTokenSource 
cancellationSource)
         {
             if (_initialize)
             {
-                _topology.Initialize();
+                _topology.Initialize(cancellationSource);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
index c9d3c49..06da501 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
@@ -95,11 +95,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// Ensure all parent and children nodes in the topology are 
registered with the Name Service.
         /// </summary>
-        void IGroupCommOperatorInternal.WaitForRegistration()
+        void 
IGroupCommOperatorInternal.WaitForRegistration(CancellationTokenSource 
cancellationSource)
         {
             if (_initialize)
             {
-                _topology.Initialize();
+                _topology.Initialize(cancellationSource);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
index 440738b..1d97d20 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs
@@ -17,6 +17,7 @@
 
 using System.Collections.Generic;
 using System.Reactive;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Task;
@@ -103,11 +104,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <summary>
         /// Ensure all parent and children nodes in the topology are 
registered with teh Name Service.
         /// </summary>
-        void IGroupCommOperatorInternal.WaitForRegistration()
+        void 
IGroupCommOperatorInternal.WaitForRegistration(CancellationTokenSource 
cancellationSource)
         {
             if (_initialize)
             {
-                _topology.Initialize();
+                _topology.Initialize(cancellationSource);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs
 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs
index 70064a8..6b71da7 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Threading;
+
 namespace Org.Apache.REEF.Network.Group.Task
 {
     internal interface ICommunicationGroupClientInternal : 
ICommunicationGroupClient
@@ -22,6 +24,6 @@ namespace Org.Apache.REEF.Network.Group.Task
         /// <summary>
         /// Call each Operator to ensure all the nodes in the topology group 
has been registered
         /// </summary>
-        void WaitingForRegistration();
+        void WaitingForRegistration(CancellationTokenSource 
cancellationSource);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommClient.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommClient.cs
index b3356d3..a090664 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommClient.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Task.Impl;
 using Org.Apache.REEF.Tang.Annotations;
 
@@ -33,5 +34,11 @@ namespace Org.Apache.REEF.Network.Group.Task
         /// <param name="groupName">The name of the 
CommunicationGroupClient</param>
         /// <returns>The configured CommunicationGroupClient</returns>
         ICommunicationGroupClient GetCommunicationGroup(string groupName);
+
+        /// <summary>
+        /// Initialization for group communications
+        /// </summary>
+        /// <param name="cancellationSource"></param>
+        void Initialize(CancellationTokenSource cancellationSource = null);
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
index 97877e8..71e9b9e 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs
@@ -35,7 +35,7 @@ namespace Org.Apache.REEF.Network.Group.Task
         /// Waits until all Tasks in the CommunicationGroup have registered 
themselves
         /// with the Name Service.
         /// </summary>
-        void Initialize();
+        void Initialize(CancellationTokenSource cancellationSource);
 
         /// <summary>
         /// Sends the message to the parent Task.

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
index 2fc90d9..ee50335 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs
@@ -18,6 +18,7 @@
 using System;
 using System.Collections.Generic;
 using System.Reflection;
+using System.Threading;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
@@ -171,12 +172,12 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <summary>
         /// Call each Operator to ensure all the nodes in the topology group 
has been registered
         /// </summary>
-        void ICommunicationGroupClientInternal.WaitingForRegistration()
+        void 
ICommunicationGroupClientInternal.WaitingForRegistration(CancellationTokenSource
 cancellationSource)
         {
             foreach (var op in _operators.Values)
             {
                 var method = 
op.GetType().GetMethod("Org.Apache.REEF.Network.Group.Operators.IGroupCommOperatorInternal.WaitForRegistration",
 BindingFlags.NonPublic | BindingFlags.Instance);
-                method.Invoke(op, null);
+                method.Invoke(op, new object[] { cancellationSource });
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
index c88e59c..6245fff 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
@@ -17,15 +17,14 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake.Remote.Impl;
 
@@ -44,6 +43,11 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         private readonly INetworkService<GeneralGroupCommunicationMessage> 
_networkService;
 
         /// <summary>
+        /// Shows if the object has been disposed.
+        /// </summary>
+        private int _disposed;
+
+        /// <summary>
         /// Creates a new WritableGroupCommClient and registers the task ID 
with the Name Server.
         /// </summary>
         /// <param name="groupConfigs">The set of serialized Group 
Communication configurations</param>
@@ -71,19 +75,25 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             }
 
             networkService.Register(new StringIdentifier(taskId));
+        }
 
+        /// <summary>
+        /// This is to ensure all the nodes in the groups are registered 
before starting communications.
+        /// </summary>
+        /// <param name="cancellationSource"></param>
+        public void Initialize(CancellationTokenSource cancellationSource = 
null)
+        {
             try
             {
                 foreach (var group in _commGroups.Values)
                 {
-                    group.WaitingForRegistration();
+                    group.WaitingForRegistration(cancellationSource);
                 }
             }
-            catch (ReefRuntimeException e)
+            catch (Exception)
             {
-                networkService.Unregister();
-                networkService.Dispose();
-                Exceptions.CaughtAndThrow(e, Level.Error, "In GroupCommClient, 
exception from WaitingForRegistration.", Logger);
+                Dispose();
+                throw;
             }
         }
 
@@ -111,8 +121,11 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// </summary>
         public void Dispose()
         {
-            _networkService.Unregister();
-            _networkService.Dispose();
+            if (Interlocked.Exchange(ref _disposed, 1) == 0)
+            {
+                _networkService.Unregister();
+                _networkService.Dispose();
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index 202cfc4..357b4d5 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -18,6 +18,7 @@
 using System;
 using System.Collections.Generic;
 using System.Linq;
+using System.Runtime.Remoting;
 using System.Threading;
 using Org.Apache.REEF.Common.Io;
 using Org.Apache.REEF.Common.Tasks;
@@ -114,7 +115,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Waits until all Tasks in the CommunicationGroup have registered 
themselves
         /// with the Name Service.
         /// </summary>
-        public void Initialize()
+        public void Initialize(CancellationTokenSource cancellationSource)
         {
             using (Logger.LogFunction("OperatorTopology::Initialize"))
             {
@@ -132,7 +133,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                         idsToWait.Add(child.Identifier);
                     }
                 }
-                WaitForTaskRegistration(idsToWait);
+                WaitForTaskRegistration(idsToWait, cancellationSource);
             }
         }
 
@@ -357,42 +358,42 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Throws exception if the operation fails more than the retry count.
         /// </summary>
         /// <param name="identifiers">The identifier to look up</param>
-        private void WaitForTaskRegistration(IList<string> identifiers)
+        /// <param name="cancellationSource">The token to cancel the 
operation</param>
+        private void WaitForTaskRegistration(IList<string> identifiers, 
CancellationTokenSource cancellationSource)
         {
             using 
(Logger.LogFunction("OperatorTopology::WaitForTaskRegistration"))
             {
                 IList<string> foundList = new List<string>();
-                try
+                for (var i = 0; i < _retryCount; i++)
                 {
-                    for (var i = 0; i < _retryCount; i++)
+                    if (cancellationSource != null && 
cancellationSource.Token.IsCancellationRequested)
                     {
-                        Logger.Log(Level.Info, 
"OperatorTopology.WaitForTaskRegistration, in retryCount {0}.", i);
-                        foreach (var identifier in identifiers)
-                        {
-                            if (!foundList.Contains(identifier) && 
_nameClient.Lookup(identifier) != null)
-                            {
-                                foundList.Add(identifier);
-                                Logger.Log(Level.Verbose, 
"OperatorTopology.WaitForTaskRegistration, find a dependent id {0} at loop 
{1}.", identifier, i);
-                            }
-                        }
+                        Logger.Log(Level.Info, 
"OperatorTopology.WaitForTaskRegistration is canceled in retryCount {0}.", i);
+                        throw new 
OperationCanceledException("WaitForTaskRegistration is canceled");
+                    }
 
-                        if (foundList.Count == identifiers.Count)
+                    Logger.Log(Level.Info, 
"OperatorTopology.WaitForTaskRegistration, in retryCount {0}.", i);
+                    foreach (var identifier in identifiers)
+                    {
+                        if (!foundList.Contains(identifier) && 
_nameClient.Lookup(identifier) != null)
                         {
-                            Logger.Log(Level.Info, 
"OperatorTopology.WaitForTaskRegistration, found all {0} dependent ids at loop 
{1}.", foundList.Count, i);
-                            return;
+                            foundList.Add(identifier);
+                            Logger.Log(Level.Verbose, 
"OperatorTopology.WaitForTaskRegistration, find a dependent id {0} at loop 
{1}.", identifier, i);
                         }
+                    }
 
-                        Thread.Sleep(_sleepTime);
+                    if (foundList.Count == identifiers.Count)
+                    {
+                        Logger.Log(Level.Info, 
"OperatorTopology.WaitForTaskRegistration, found all {0} dependent ids at loop 
{1}.", foundList.Count, i);
+                        return;
                     }
-                }
-                catch (Exception e)
-                {
-                    Exceptions.CaughtAndThrow(e, Level.Error, "Exception in 
OperatorTopology.WaitForTaskRegistration.", Logger);
+
+                    Thread.Sleep(_sleepTime);
                 }
 
                 var leftOver = string.Join(",", identifiers.Where(e => 
!foundList.Contains(e)));
                 Logger.Log(Level.Error, "For node {0}, cannot find registered 
parent/children: {1}.", _selfId, leftOver);
-                Exceptions.Throw(new ReefRuntimeException("Failed to 
initialize operator topology for node: " + _selfId), Logger);
+                Exceptions.Throw(new RemotingException("Failed to find 
parent/children nodes in operator topology for node: " + _selfId), Logger);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs 
b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
index 53a74b4..9291178 100644
--- a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
+++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs
@@ -18,6 +18,7 @@
 using System;
 using System.Collections.Generic;
 using System.Net;
+using System.Threading;
 using Org.Apache.REEF.Common.Io;
 using Org.Apache.REEF.Network.NetworkService.Codec;
 using Org.Apache.REEF.Tang.Annotations;
@@ -44,6 +45,11 @@ namespace Org.Apache.REEF.Network.NetworkService
         private readonly Dictionary<IIdentifier, IConnection<T>> 
_connectionMap;
 
         /// <summary>
+        /// Shows if the object has been disposed.
+        /// </summary>
+        private int _disposed;
+
+        /// <summary>
         /// Create a new NetworkService.
         /// </summary>
         /// <param name="nsPort">The port that the NetworkService will listen 
on</param>
@@ -152,10 +158,13 @@ namespace Org.Apache.REEF.Network.NetworkService
         /// </summary>
         public void Dispose()
         {
-            NamingClient.Dispose();
-            _remoteManager.Dispose();
+            if (Interlocked.Exchange(ref _disposed, 1) == 0)
+            {
+                NamingClient.Dispose();
+                _remoteManager.Dispose();
 
-            LOGGER.Log(Level.Verbose, "Disposed of network service");
+                LOGGER.Log(Level.Verbose, "Disposed of network service");
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
index 5de2a6f..2dfd593 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
@@ -35,7 +35,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// This test is to throw exceptions in two tasks. In the first try, 
there is task app failure,
         /// and no retries will be done. 
         /// </summary>
-        [Fact(Skip = "Times out at high timeout for 
RetryCountWaitingForRegistration; disabling until this parameter is 
configurable in test.")]
+        [Fact]
         public override void TestFailedMapperOnLocalRuntime()
         {
             int chunkSize = 2;
@@ -59,9 +59,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             var failedEvaluatorCount = GetMessageCount(lines, 
FailedEvaluatorMessage);
             var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
 
-            // on each try each task should fail or complete or disappear with 
failed evaluator
-            // not all tasks will start successfully, so not checking this
-            Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + 
failedEvaluatorCount + failedTaskCount);
+            // In each retry, there are 2 failed evaluators.
+            // The running tasks should receive cancellation and return 
properly. There will be no failed task.
+            // Rest of the tasks should be canceled and send completed task 
event to the driver. 
+            Assert.Equal(NumberOfRetry * 2, failedEvaluatorCount);
+            Assert.Equal(0, failedTaskCount);
+            Assert.Equal(((NumberOfRetry + 1) * numTasks) - (NumberOfRetry * 
2), completedTaskCount);
             CleanUp(testFolder);
         }
 

Reply via email to