Repository: reef Updated Branches: refs/heads/master 038b24dc6 -> c01d94737
[REEF-1522] Optimize WaitForTaskRegistration * Optimize WaitForTaskRegistration * Log for nodes that cannot be foud * Log time for the entire WaitForTaskRegistration JIRA: [REEF-1522](https://issues.apache.org/jira/browse/REEF-1522) Pull Request: This closes #1092 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/c01d9473 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/c01d9473 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/c01d9473 Branch: refs/heads/master Commit: c01d947376490c5d8fdc3199fd04cd484c1af9f5 Parents: 038b24d Author: Julia Wang <[email protected]> Authored: Thu Aug 11 11:21:46 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Tue Aug 23 08:24:49 2016 -0700 ---------------------------------------------------------------------- .../Group/Task/Impl/GroupCommClient.cs | 19 +++++++- .../Group/Task/Impl/OperatorTopology.cs | 49 +++++++++++++++----- 2 files changed, 55 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/c01d9473/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs index 8300c83..b4f8374 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs @@ -17,13 +17,17 @@ using System; using System.Collections.Generic; +using System.Globalization; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Remote.Impl; namespace Org.Apache.REEF.Network.Group.Task.Impl @@ -34,6 +38,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// </summary> public sealed class GroupCommClient : IGroupCommClient { + private static readonly Logger Logger = Logger.GetLogger(typeof(GroupCommClient)); + private readonly Dictionary<string, ICommunicationGroupClientInternal> _commGroups; private readonly INetworkService<GeneralGroupCommunicationMessage> _networkService; @@ -67,9 +73,18 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl networkService.Register(new StringIdentifier(taskId)); - foreach (var group in _commGroups.Values) + try + { + foreach (var group in _commGroups.Values) + { + group.WaitingForRegistration(); + } + } + catch (SystemException e) { - group.WaitingForRegistration(); + networkService.Unregister(); + networkService.Dispose(); + Exceptions.CaughtAndThrow(e, Level.Error, "In GroupCommClient, exception from WaitingForRegistration.", Logger); } } http://git-wip-us.apache.org/repos/asf/reef/blob/c01d9473/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs index c4d4d0a..80b4dd3 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs @@ -28,6 +28,7 @@ using Org.Apache.REEF.Network.Group.Operators.Impl; using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Network.Group.Task.Impl @@ -117,18 +118,21 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { using (Logger.LogFunction("OperatorTopology::Initialize")) { + IList<string> idsToWait = new List<string>(); + if (_parent != null) { - WaitForTaskRegistration(_parent.Identifier, _retryCount); + idsToWait.Add(_parent.Identifier); } if (_childNodeContainer.Count > 0) { foreach (var child in _childNodeContainer) { - WaitForTaskRegistration(child.Identifier, _retryCount); + idsToWait.Add(child.Identifier); } } + WaitForTaskRegistration(idsToWait); } } @@ -352,21 +356,44 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Checks if the identifier is registered with the Name Server. /// Throws exception if the operation fails more than the retry count. /// </summary> - /// <param name="identifier">The identifier to look up</param> - /// <param name="retries">The number of times to retry the lookup operation</param> - private void WaitForTaskRegistration(string identifier, int retries) + /// <param name="identifiers">The identifier to look up</param> + private void WaitForTaskRegistration(IList<string> identifiers) { - for (int i = 0; i < retries; i++) + using (Logger.LogFunction("OperatorTopology::WaitForTaskRegistration")) { - if (_nameClient.Lookup(identifier) != null) + IList<string> foundList = new List<string>(); + try + { + for (var i = 0; i < _retryCount; i++) + { + Logger.Log(Level.Info, "OperatorTopology.WaitForTaskRegistration, in retryCount {0}.", i); + foreach (var identifier in identifiers) + { + if (!foundList.Contains(identifier) && _nameClient.Lookup(identifier) != null) + { + foundList.Add(identifier); + Logger.Log(Level.Verbose, "OperatorTopology.WaitForTaskRegistration, find a dependent id {0} at loop {1}.", identifier, i); + } + } + + if (foundList.Count == identifiers.Count) + { + Logger.Log(Level.Info, "OperatorTopology.WaitForTaskRegistration, found all {0} dependent ids at loop {1}.", foundList.Count, i); + return; + } + + Thread.Sleep(_sleepTime); + } + } + catch (Exception e) { - return; + Exceptions.CaughtAndThrow(e, Level.Error, "Exception in OperatorTopology.WaitForTaskRegistration.", Logger); } - Thread.Sleep(_sleepTime); + var leftOver = string.Join(",", identifiers.Where(e => !foundList.Contains(e))); + Logger.Log(Level.Error, "For node {0}, cannot find registered parent/children: {1}.", _selfId, leftOver); + Exceptions.Throw(new SystemException("Failed to initialize operator topology for node: " + _selfId), Logger); } - - throw new IllegalStateException("Failed to initialize operator topology for node: " + identifier); } } } \ No newline at end of file
