Repository: reef
Updated Branches:
  refs/heads/master 038b24dc6 -> c01d94737


[REEF-1522] Optimize WaitForTaskRegistration

  * Optimize WaitForTaskRegistration
  * Log for nodes that cannot be foud
  * Log time for the entire WaitForTaskRegistration

JIRA:
  [REEF-1522](https://issues.apache.org/jira/browse/REEF-1522)

Pull Request:
  This closes #1092


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/c01d9473
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/c01d9473
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/c01d9473

Branch: refs/heads/master
Commit: c01d947376490c5d8fdc3199fd04cd484c1af9f5
Parents: 038b24d
Author: Julia Wang <[email protected]>
Authored: Thu Aug 11 11:21:46 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Tue Aug 23 08:24:49 2016 -0700

----------------------------------------------------------------------
 .../Group/Task/Impl/GroupCommClient.cs          | 19 +++++++-
 .../Group/Task/Impl/OperatorTopology.cs         | 49 +++++++++++++++-----
 2 files changed, 55 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/c01d9473/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
index 8300c83..b4f8374 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs
@@ -17,13 +17,17 @@
 
 using System;
 using System.Collections.Generic;
+using System.Globalization;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake.Remote.Impl;
 
 namespace Org.Apache.REEF.Network.Group.Task.Impl
@@ -34,6 +38,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// </summary>
     public sealed class GroupCommClient : IGroupCommClient
     {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(GroupCommClient));
+
         private readonly Dictionary<string, ICommunicationGroupClientInternal> 
_commGroups;
 
         private readonly INetworkService<GeneralGroupCommunicationMessage> 
_networkService;
@@ -67,9 +73,18 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 
             networkService.Register(new StringIdentifier(taskId));
 
-            foreach (var group in _commGroups.Values)
+            try
+            {
+                foreach (var group in _commGroups.Values)
+                {
+                    group.WaitingForRegistration();
+                }
+            }
+            catch (SystemException e)
             {
-               group.WaitingForRegistration();
+                networkService.Unregister();
+                networkService.Dispose();
+                Exceptions.CaughtAndThrow(e, Level.Error, "In GroupCommClient, 
exception from WaitingForRegistration.", Logger);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/c01d9473/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index c4d4d0a..80b4dd3 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -28,6 +28,7 @@ using Org.Apache.REEF.Network.Group.Operators.Impl;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Exceptions;
+using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Network.Group.Task.Impl
@@ -117,18 +118,21 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         {
             using (Logger.LogFunction("OperatorTopology::Initialize"))
             {
+                IList<string> idsToWait = new List<string>();
+
                 if (_parent != null)
                 {
-                    WaitForTaskRegistration(_parent.Identifier, _retryCount);
+                    idsToWait.Add(_parent.Identifier);
                 }
 
                 if (_childNodeContainer.Count > 0)
                 {
                     foreach (var child in _childNodeContainer)
                     {
-                        WaitForTaskRegistration(child.Identifier, _retryCount);
+                        idsToWait.Add(child.Identifier);
                     }
                 }
+                WaitForTaskRegistration(idsToWait);
             }
         }
 
@@ -352,21 +356,44 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Checks if the identifier is registered with the Name Server.
         /// Throws exception if the operation fails more than the retry count.
         /// </summary>
-        /// <param name="identifier">The identifier to look up</param>
-        /// <param name="retries">The number of times to retry the lookup 
operation</param>
-        private void WaitForTaskRegistration(string identifier, int retries)
+        /// <param name="identifiers">The identifier to look up</param>
+        private void WaitForTaskRegistration(IList<string> identifiers)
         {
-            for (int i = 0; i < retries; i++)
+            using 
(Logger.LogFunction("OperatorTopology::WaitForTaskRegistration"))
             {
-                if (_nameClient.Lookup(identifier) != null)
+                IList<string> foundList = new List<string>();
+                try
+                {
+                    for (var i = 0; i < _retryCount; i++)
+                    {
+                        Logger.Log(Level.Info, 
"OperatorTopology.WaitForTaskRegistration, in retryCount {0}.", i);
+                        foreach (var identifier in identifiers)
+                        {
+                            if (!foundList.Contains(identifier) && 
_nameClient.Lookup(identifier) != null)
+                            {
+                                foundList.Add(identifier);
+                                Logger.Log(Level.Verbose, 
"OperatorTopology.WaitForTaskRegistration, find a dependent id {0} at loop 
{1}.", identifier, i);
+                            }
+                        }
+
+                        if (foundList.Count == identifiers.Count)
+                        {
+                            Logger.Log(Level.Info, 
"OperatorTopology.WaitForTaskRegistration, found all {0} dependent ids at loop 
{1}.", foundList.Count, i);
+                            return;
+                        }
+
+                        Thread.Sleep(_sleepTime);
+                    }
+                }
+                catch (Exception e)
                 {
-                    return;
+                    Exceptions.CaughtAndThrow(e, Level.Error, "Exception in 
OperatorTopology.WaitForTaskRegistration.", Logger);
                 }
 
-                Thread.Sleep(_sleepTime);
+                var leftOver = string.Join(",", identifiers.Where(e => 
!foundList.Contains(e)));
+                Logger.Log(Level.Error, "For node {0}, cannot find registered 
parent/children: {1}.", _selfId, leftOver);
+                Exceptions.Throw(new SystemException("Failed to initialize 
operator topology for node: " + _selfId), Logger);
             }
-
-            throw new IllegalStateException("Failed to initialize operator 
topology for node: " + identifier);
         }
     }
 }
\ No newline at end of file

Reply via email to