Repository: reef Updated Branches: refs/heads/master 1c7adac04 -> 1b83ad213
[REEF-1549] Resolve the issue in WaitingForRegistration * Move WaitingForRegistration from constructor to Call method in tasks * Add Cancellation token to WaitingForRegistration method * Enable test case TestFailMapperEvaluatorsOnInit.TestFailedMapperOnLocalRuntime which tests the cancelation scenario in WaitingForRegistration JIRA: [REEF-1549](https://issues.apache.org/jira/browse/REEF-1549) This closes #1117 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/1b83ad21 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/1b83ad21 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/1b83ad21 Branch: refs/heads/master Commit: 1b83ad21346ffb028dd8271ef4ac7b2f5af19b1f Parents: 1c7adac Author: Julia Wang <[email protected]> Authored: Fri Sep 9 18:07:18 2016 -0700 Committer: Mariia Mykhailova <[email protected]> Committed: Tue Sep 13 20:02:22 2016 -0700 ---------------------------------------------------------------------- .../MachineLearning/KMeans/KMeansMasterTask.cs | 16 ++++--- .../MachineLearning/KMeans/KMeansSlaveTask.cs | 1 + .../OnREEF/IMRUTasks/TaskHostBase.cs | 1 + .../BroadcastReduceDriverAndTasks/MasterTask.cs | 1 + .../BroadcastReduceDriverAndTasks/SlaveTask.cs | 1 + .../PipelinedMasterTask.cs | 1 + .../PipelinedSlaveTask.cs | 2 + .../ScatterReduceDriverAndTasks/MasterTask.cs | 1 + .../ScatterReduceDriverAndTasks/SlaveTask.cs | 1 + .../Operators/IGroupCommOperatorInternal.cs | 4 +- .../Group/Operators/Impl/BroadcastReceiver.cs | 4 +- .../Group/Operators/Impl/BroadcastSender.cs | 5 ++- .../Group/Operators/Impl/ReduceReceiver.cs | 4 +- .../Group/Operators/Impl/ReduceSender.cs | 4 +- .../Group/Operators/Impl/ScatterReceiver.cs | 4 +- .../Group/Operators/Impl/ScatterSender.cs | 5 ++- .../Task/ICommunicationGroupClientInternal.cs | 4 +- .../Group/Task/IGroupCommClient.cs | 7 +++ .../Group/Task/IOperatorTopology.cs | 2 +- .../Group/Task/Impl/CommunicationGroupClient.cs | 5 ++- .../Group/Task/Impl/GroupCommClient.cs | 31 +++++++++---- .../Group/Task/Impl/OperatorTopology.cs | 47 ++++++++++---------- .../NetworkService/NetworkService.cs | 15 +++++-- .../IMRU/TestFailMapperEvaluatorsOnInit.cs | 11 +++-- 24 files changed, 114 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs index 140897b..e85ecd6 100644 --- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs +++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansMasterTask.cs @@ -33,15 +33,15 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans { private static readonly Logger Logger = Logger.GetLogger(typeof(KMeansMasterTask)); - private int _iteration = 0; + private int _iteration; - private readonly ICommunicationGroupClient _commGroup; private readonly IBroadcastSender<Centroids> _dataBroadcastSender; private readonly IBroadcastSender<ControlMessage> _controlBroadcastSender; private readonly IReduceReceiver<ProcessedResults> _meansReducerReceiver; private readonly string _kMeansExecutionDirectory; private Centroids _centroids; private bool _isInitialIteration; + private readonly IGroupCommClient _groupCommClient; [Inject] public KMeansMasterTask( @@ -55,10 +55,11 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans { throw new ArgumentException("There must be more than 1 Evaluators in total, but the total evaluators number provided is " + totalNumEvaluators); } - _commGroup = groupCommClient.GetCommunicationGroup(Constants.KMeansCommunicationGroupName); - _dataBroadcastSender = _commGroup.GetBroadcastSender<Centroids>(Constants.CentroidsBroadcastOperatorName); - _meansReducerReceiver = _commGroup.GetReduceReceiver<ProcessedResults>(Constants.MeansReduceOperatorName); - _controlBroadcastSender = _commGroup.GetBroadcastSender<ControlMessage>(Constants.ControlMessageBroadcastOperatorName); + _groupCommClient = groupCommClient; + var commGroup = _groupCommClient.GetCommunicationGroup(Constants.KMeansCommunicationGroupName); + _dataBroadcastSender = commGroup.GetBroadcastSender<Centroids>(Constants.CentroidsBroadcastOperatorName); + _meansReducerReceiver = commGroup.GetReduceReceiver<ProcessedResults>(Constants.MeansReduceOperatorName); + _controlBroadcastSender = commGroup.GetBroadcastSender<ControlMessage>(Constants.ControlMessageBroadcastOperatorName); _kMeansExecutionDirectory = executionDirectory; _isInitialIteration = true; } @@ -66,7 +67,8 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans public byte[] Call(byte[] memento) { - // TODO: this belongs to dedicated dataloader layer, will refactor once we have that + // TODO: this belongs to dedicated data loader layer, will refactor once we have that + _groupCommClient.Initialize(); string centroidFile = Path.Combine(_kMeansExecutionDirectory, Constants.CentroidsFile); _centroids = new Centroids(DataPartitionCache.ReadDataFile(centroidFile)); http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs index db66ed1..9887459 100644 --- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs +++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansSlaveTask.cs @@ -57,6 +57,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans public byte[] Call(byte[] memento) { + _groupCommClient.Initialize(); while (true) { if (_controlBroadcastReceiver.Receive() == ControlMessage.STOP) http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs index d025528..c14ff73 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs @@ -90,6 +90,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks Logger.Log(Level.Info, "Entering {0} Call().", TaskHostName); try { + _groupCommunicationsClient.Initialize(_cancellationSource); return TaskBody(memento); } catch (Exception e) http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs index 8419018..ade8df0 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs @@ -56,6 +56,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri public byte[] Call(byte[] memento) { + _groupCommClient.Initialize(); Stopwatch broadcastTime = new Stopwatch(); Stopwatch reduceTime = new Stopwatch(); http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs index cf1a4ec..d9f033f 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs @@ -51,6 +51,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri public byte[] Call(byte[] memento) { + _groupCommClient.Initialize(); Stopwatch broadcastTime = new Stopwatch(); Stopwatch reduceTime = new Stopwatch(); http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs index 984f67f..2c1ca59 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs @@ -60,6 +60,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR public byte[] Call(byte[] memento) { + _groupCommClient.Initialize(); int[] intArr = new int[_arraySize]; for (int j = 0; j < _arraySize; j++) http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs index 67c9db4..6cc008c 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs @@ -54,6 +54,8 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR public byte[] Call(byte[] memento) { + _groupCommClient.Initialize(); + int[] resArr = new int[_arraySize]; for (int j = 0; j < resArr.Length; j++) http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs index 644c597..840eb5d 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs @@ -47,6 +47,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive public byte[] Call(byte[] memento) { + _groupCommClient.Initialize(); List<int> data = Enumerable.Range(1, 100).ToList(); _scatterSender.Send(data); http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs index 748d685..c2ae829 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/SlaveTask.cs @@ -47,6 +47,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive public byte[] Call(byte[] memento) { + _groupCommClient.Initialize(); List<int> data = _scatterReceiver.Receive(); Logger.Log(Level.Info, "Received data: {0}", string.Join(" ", data)); http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs index 5267a2c..bcb8e44 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IGroupCommOperatorInternal.cs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +using System.Threading; + namespace Org.Apache.REEF.Network.Group.Operators { internal interface IGroupCommOperatorInternal @@ -22,6 +24,6 @@ namespace Org.Apache.REEF.Network.Group.Operators /// <summary> /// Ensure all parent and children nodes in the topology are registered with teh Name Service. /// </summary> - void WaitForRegistration(); + void WaitForRegistration(CancellationTokenSource cancellationSource); } } http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs index cad774d..f7f9e8f 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs @@ -114,11 +114,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <summary> /// Ensure all parent and children nodes in the topology are registered with teh Name Service. /// </summary> - void IGroupCommOperatorInternal.WaitForRegistration() + void IGroupCommOperatorInternal.WaitForRegistration(CancellationTokenSource cancellationSource) { if (_initialize) { - _topology.Initialize(); + _topology.Initialize(cancellationSource); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs index aa75c6e..59f8df9 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs @@ -16,6 +16,7 @@ // under the License. using System; +using System.Threading; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Task; @@ -108,11 +109,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <summary> /// Ensure all parent and children nodes in the topology are registered with teh Name Service. /// </summary> - void IGroupCommOperatorInternal.WaitForRegistration() + void IGroupCommOperatorInternal.WaitForRegistration(CancellationTokenSource cancellationSource) { if (_initialize) { - _topology.Initialize(); + _topology.Initialize(cancellationSource); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs index fb129da..085e1db 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs @@ -120,11 +120,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <summary> /// Ensure all parent and children nodes in the topology are registered with teh Name Service. /// </summary> - void IGroupCommOperatorInternal.WaitForRegistration() + void IGroupCommOperatorInternal.WaitForRegistration(CancellationTokenSource cancellationSource) { if (_initialize) { - _topology.Initialize(); + _topology.Initialize(cancellationSource); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs index 76202b2..9977d5f 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs @@ -140,11 +140,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <summary> /// Ensure all parent and children nodes in the topology are registered with teh Name Service. /// </summary> - void IGroupCommOperatorInternal.WaitForRegistration() + void IGroupCommOperatorInternal.WaitForRegistration(CancellationTokenSource cancellationSource) { if (_initialize) { - _topology.Initialize(); + _topology.Initialize(cancellationSource); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs index c9d3c49..06da501 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs @@ -95,11 +95,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <summary> /// Ensure all parent and children nodes in the topology are registered with the Name Service. /// </summary> - void IGroupCommOperatorInternal.WaitForRegistration() + void IGroupCommOperatorInternal.WaitForRegistration(CancellationTokenSource cancellationSource) { if (_initialize) { - _topology.Initialize(); + _topology.Initialize(cancellationSource); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs index 440738b..1d97d20 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs @@ -17,6 +17,7 @@ using System.Collections.Generic; using System.Reactive; +using System.Threading; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Task; @@ -103,11 +104,11 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <summary> /// Ensure all parent and children nodes in the topology are registered with teh Name Service. /// </summary> - void IGroupCommOperatorInternal.WaitForRegistration() + void IGroupCommOperatorInternal.WaitForRegistration(CancellationTokenSource cancellationSource) { if (_initialize) { - _topology.Initialize(); + _topology.Initialize(cancellationSource); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs index 70064a8..6b71da7 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupClientInternal.cs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +using System.Threading; + namespace Org.Apache.REEF.Network.Group.Task { internal interface ICommunicationGroupClientInternal : ICommunicationGroupClient @@ -22,6 +24,6 @@ namespace Org.Apache.REEF.Network.Group.Task /// <summary> /// Call each Operator to ensure all the nodes in the topology group has been registered /// </summary> - void WaitingForRegistration(); + void WaitingForRegistration(CancellationTokenSource cancellationSource); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommClient.cs index b3356d3..a090664 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommClient.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommClient.cs @@ -16,6 +16,7 @@ // under the License. using System; +using System.Threading; using Org.Apache.REEF.Network.Group.Task.Impl; using Org.Apache.REEF.Tang.Annotations; @@ -33,5 +34,11 @@ namespace Org.Apache.REEF.Network.Group.Task /// <param name="groupName">The name of the CommunicationGroupClient</param> /// <returns>The configured CommunicationGroupClient</returns> ICommunicationGroupClient GetCommunicationGroup(string groupName); + + /// <summary> + /// Initialization for group communications + /// </summary> + /// <param name="cancellationSource"></param> + void Initialize(CancellationTokenSource cancellationSource = null); } } http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs index 97877e8..71e9b9e 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IOperatorTopology.cs @@ -35,7 +35,7 @@ namespace Org.Apache.REEF.Network.Group.Task /// Waits until all Tasks in the CommunicationGroup have registered themselves /// with the Name Service. /// </summary> - void Initialize(); + void Initialize(CancellationTokenSource cancellationSource); /// <summary> /// Sends the message to the parent Task. http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs index 2fc90d9..ee50335 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupClient.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Generic; using System.Reflection; +using System.Threading; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Operators.Impl; @@ -171,12 +172,12 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <summary> /// Call each Operator to ensure all the nodes in the topology group has been registered /// </summary> - void ICommunicationGroupClientInternal.WaitingForRegistration() + void ICommunicationGroupClientInternal.WaitingForRegistration(CancellationTokenSource cancellationSource) { foreach (var op in _operators.Values) { var method = op.GetType().GetMethod("Org.Apache.REEF.Network.Group.Operators.IGroupCommOperatorInternal.WaitForRegistration", BindingFlags.NonPublic | BindingFlags.Instance); - method.Invoke(op, null); + method.Invoke(op, new object[] { cancellationSource }); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs index c88e59c..6245fff 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs @@ -17,15 +17,14 @@ using System; using System.Collections.Generic; +using System.Threading; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Remote.Impl; @@ -44,6 +43,11 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl private readonly INetworkService<GeneralGroupCommunicationMessage> _networkService; /// <summary> + /// Shows if the object has been disposed. + /// </summary> + private int _disposed; + + /// <summary> /// Creates a new WritableGroupCommClient and registers the task ID with the Name Server. /// </summary> /// <param name="groupConfigs">The set of serialized Group Communication configurations</param> @@ -71,19 +75,25 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl } networkService.Register(new StringIdentifier(taskId)); + } + /// <summary> + /// This is to ensure all the nodes in the groups are registered before starting communications. + /// </summary> + /// <param name="cancellationSource"></param> + public void Initialize(CancellationTokenSource cancellationSource = null) + { try { foreach (var group in _commGroups.Values) { - group.WaitingForRegistration(); + group.WaitingForRegistration(cancellationSource); } } - catch (ReefRuntimeException e) + catch (Exception) { - networkService.Unregister(); - networkService.Dispose(); - Exceptions.CaughtAndThrow(e, Level.Error, "In GroupCommClient, exception from WaitingForRegistration.", Logger); + Dispose(); + throw; } } @@ -111,8 +121,11 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// </summary> public void Dispose() { - _networkService.Unregister(); - _networkService.Dispose(); + if (Interlocked.Exchange(ref _disposed, 1) == 0) + { + _networkService.Unregister(); + _networkService.Dispose(); + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs index 202cfc4..357b4d5 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Runtime.Remoting; using System.Threading; using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Common.Tasks; @@ -114,7 +115,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Waits until all Tasks in the CommunicationGroup have registered themselves /// with the Name Service. /// </summary> - public void Initialize() + public void Initialize(CancellationTokenSource cancellationSource) { using (Logger.LogFunction("OperatorTopology::Initialize")) { @@ -132,7 +133,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl idsToWait.Add(child.Identifier); } } - WaitForTaskRegistration(idsToWait); + WaitForTaskRegistration(idsToWait, cancellationSource); } } @@ -357,42 +358,42 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Throws exception if the operation fails more than the retry count. /// </summary> /// <param name="identifiers">The identifier to look up</param> - private void WaitForTaskRegistration(IList<string> identifiers) + /// <param name="cancellationSource">The token to cancel the operation</param> + private void WaitForTaskRegistration(IList<string> identifiers, CancellationTokenSource cancellationSource) { using (Logger.LogFunction("OperatorTopology::WaitForTaskRegistration")) { IList<string> foundList = new List<string>(); - try + for (var i = 0; i < _retryCount; i++) { - for (var i = 0; i < _retryCount; i++) + if (cancellationSource != null && cancellationSource.Token.IsCancellationRequested) { - Logger.Log(Level.Info, "OperatorTopology.WaitForTaskRegistration, in retryCount {0}.", i); - foreach (var identifier in identifiers) - { - if (!foundList.Contains(identifier) && _nameClient.Lookup(identifier) != null) - { - foundList.Add(identifier); - Logger.Log(Level.Verbose, "OperatorTopology.WaitForTaskRegistration, find a dependent id {0} at loop {1}.", identifier, i); - } - } + Logger.Log(Level.Info, "OperatorTopology.WaitForTaskRegistration is canceled in retryCount {0}.", i); + throw new OperationCanceledException("WaitForTaskRegistration is canceled"); + } - if (foundList.Count == identifiers.Count) + Logger.Log(Level.Info, "OperatorTopology.WaitForTaskRegistration, in retryCount {0}.", i); + foreach (var identifier in identifiers) + { + if (!foundList.Contains(identifier) && _nameClient.Lookup(identifier) != null) { - Logger.Log(Level.Info, "OperatorTopology.WaitForTaskRegistration, found all {0} dependent ids at loop {1}.", foundList.Count, i); - return; + foundList.Add(identifier); + Logger.Log(Level.Verbose, "OperatorTopology.WaitForTaskRegistration, find a dependent id {0} at loop {1}.", identifier, i); } + } - Thread.Sleep(_sleepTime); + if (foundList.Count == identifiers.Count) + { + Logger.Log(Level.Info, "OperatorTopology.WaitForTaskRegistration, found all {0} dependent ids at loop {1}.", foundList.Count, i); + return; } - } - catch (Exception e) - { - Exceptions.CaughtAndThrow(e, Level.Error, "Exception in OperatorTopology.WaitForTaskRegistration.", Logger); + + Thread.Sleep(_sleepTime); } var leftOver = string.Join(",", identifiers.Where(e => !foundList.Contains(e))); Logger.Log(Level.Error, "For node {0}, cannot find registered parent/children: {1}.", _selfId, leftOver); - Exceptions.Throw(new ReefRuntimeException("Failed to initialize operator topology for node: " + _selfId), Logger); + Exceptions.Throw(new RemotingException("Failed to find parent/children nodes in operator topology for node: " + _selfId), Logger); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs index 53a74b4..9291178 100644 --- a/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/NetworkService.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Generic; using System.Net; +using System.Threading; using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Network.NetworkService.Codec; using Org.Apache.REEF.Tang.Annotations; @@ -44,6 +45,11 @@ namespace Org.Apache.REEF.Network.NetworkService private readonly Dictionary<IIdentifier, IConnection<T>> _connectionMap; /// <summary> + /// Shows if the object has been disposed. + /// </summary> + private int _disposed; + + /// <summary> /// Create a new NetworkService. /// </summary> /// <param name="nsPort">The port that the NetworkService will listen on</param> @@ -152,10 +158,13 @@ namespace Org.Apache.REEF.Network.NetworkService /// </summary> public void Dispose() { - NamingClient.Dispose(); - _remoteManager.Dispose(); + if (Interlocked.Exchange(ref _disposed, 1) == 0) + { + NamingClient.Dispose(); + _remoteManager.Dispose(); - LOGGER.Log(Level.Verbose, "Disposed of network service"); + LOGGER.Log(Level.Verbose, "Disposed of network service"); + } } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1b83ad21/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs index 5de2a6f..2dfd593 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs @@ -35,7 +35,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU /// This test is to throw exceptions in two tasks. In the first try, there is task app failure, /// and no retries will be done. /// </summary> - [Fact(Skip = "Times out at high timeout for RetryCountWaitingForRegistration; disabling until this parameter is configurable in test.")] + [Fact] public override void TestFailedMapperOnLocalRuntime() { int chunkSize = 2; @@ -59,9 +59,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage); var failedTaskCount = GetMessageCount(lines, FailedTaskMessage); - // on each try each task should fail or complete or disappear with failed evaluator - // not all tasks will start successfully, so not checking this - Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + failedEvaluatorCount + failedTaskCount); + // In each retry, there are 2 failed evaluators. + // The running tasks should receive cancellation and return properly. There will be no failed task. + // Rest of the tasks should be canceled and send completed task event to the driver. + Assert.Equal(NumberOfRetry * 2, failedEvaluatorCount); + Assert.Equal(0, failedTaskCount); + Assert.Equal(((NumberOfRetry + 1) * numTasks) - (NumberOfRetry * 2), completedTaskCount); CleanUp(testFolder); }
