Repository: reef
Updated Branches:
  refs/heads/master 528706a0d -> d9a390303


[REEF-1469] Simplify logic in OperatorTopology

This addressed the issue by
  * Replacing logic in GetNodesWithData with synchronous calls to GetData. This 
is okay because we will need to wait for all children nodes to receive data 
before proceeding.
  * Renaming variables for clarity.
  * Adding ChildNodeContainer to simplify management of children nodes.

JIRA:
  [REEF-1469](https://issues.apache.org/jira/browse/REEF-1469)

Pull Request:
  Closes #1051


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d9a39030
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d9a39030
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d9a39030

Branch: refs/heads/master
Commit: d9a390303419f7997f6398054c5df80f40a77829
Parents: 528706a
Author: Andrew Chung <[email protected]>
Authored: Wed Jun 22 17:18:13 2016 -0700
Committer: dhruv <[email protected]>
Committed: Mon Jun 27 16:39:53 2016 -0700

----------------------------------------------------------------------
 .../Group/Task/Impl/ChildNodeContainer.cs       |  92 +++++++++
 .../Group/Task/Impl/NodeStruct.cs               |  17 --
 .../Group/Task/Impl/OperatorTopology.cs         | 205 ++++---------------
 .../Org.Apache.REEF.Network.csproj              |   1 +
 4 files changed, 132 insertions(+), 183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/d9a39030/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs
new file mode 100644
index 0000000..297212e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Utilities.Attributes;
+
+namespace Org.Apache.REEF.Network.Group.Task.Impl
+{
+    /// <summary>
+    /// A container for children nodes in Group Communications.
+    /// </summary>
+    [NotThreadSafe]
+    internal sealed class ChildNodeContainer<T> : IEnumerable<NodeStruct<T>>
+    {
+        private readonly Dictionary<string, NodeStruct<T>> _childIdToNodeMap = 
+            new Dictionary<string, NodeStruct<T>>();
+
+        /// <summary>
+        /// Gets the number of children.
+        /// </summary>
+        public int Count
+        {
+            get { return _childIdToNodeMap.Count; }
+        }
+
+        /// <summary>
+        /// Puts the child node into the container.
+        /// </summary>
+        public void PutNode(NodeStruct<T> childNode)
+        {
+            _childIdToNodeMap.Add(childNode.Identifier, childNode);
+        }
+
+        /// <summary>
+        /// Gets the child with the specified identifier.
+        /// </summary>
+        public bool TryGetChild(string identifier, out NodeStruct<T> child)
+        {
+            return _childIdToNodeMap.TryGetValue(identifier, out child);
+        }
+
+        /// <summary>
+        /// Gets the child with the specified identifier. Returns null if 
child does not exist.
+        /// </summary>
+        public NodeStruct<T> GetChild(string identifier)
+        {
+            NodeStruct<T> child;
+            TryGetChild(identifier, out child);
+            return child;
+        }
+
+        /// <summary>
+        /// Gets the data from all children nodes synchronously.
+        /// </summary>
+        public IEnumerable<T> GetDataFromAllChildren()
+        {
+            return this.SelectMany(child => child.GetData());
+        }
+
+        /// <summary>
+        /// Gets an Enumerator for iterating through children nodes.
+        /// </summary>
+        public IEnumerator<NodeStruct<T>> GetEnumerator()
+        {
+            return _childIdToNodeMap.Values.GetEnumerator();
+        }
+
+        /// <summary>
+        /// Gets an Enumerator for iterating through children nodes.
+        /// </summary>
+        IEnumerator IEnumerable.GetEnumerator()
+        {
+            return GetEnumerator();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/d9a39030/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
index a03fd40..e13d724 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-using System;
 using System.Collections.Concurrent;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 
@@ -63,21 +62,5 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         {
             _messageQueue.Add(gcm);
         }
-
-        /// <summary>
-        /// Tells whether there is a message in queue or not.
-        /// </summary>
-        /// <returns>True if queue is non empty, false otherwise.</returns>
-        internal bool HasMessage()
-        {
-            if (_messageQueue.Count != 0)
-            {
-                return true;
-            }
-            else
-            {
-                return false;
-            }
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d9a39030/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index caf831a..bf63af4 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -16,7 +16,6 @@
 // under the License.
 
 using System;
-using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Linq;
 using System.Threading;
@@ -51,13 +50,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         private readonly int _retryCount;
         private readonly int _sleepTime;
 
+        private readonly ChildNodeContainer<T> _childNodeContainer = new 
ChildNodeContainer<T>();
         private readonly NodeStruct<T> _parent;
-        private readonly List<NodeStruct<T>> _children;
-        private readonly Dictionary<string, NodeStruct<T>> _idToNodeMap;
         private readonly INameClient _nameClient;
         private readonly Sender _sender;
-        private readonly BlockingCollection<NodeStruct<T>> _nodesWithData;
-        private readonly object _thisLock = new object();
 
         /// <summary>
         /// Creates a new OperatorTopology object.
@@ -93,24 +89,12 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             _sleepTime = sleepTime;
             _nameClient = networkService.NamingClient;
             _sender = sender;
-            _nodesWithData = new BlockingCollection<NodeStruct<T>>();
-            _children = new List<NodeStruct<T>>();
-            _idToNodeMap = new Dictionary<string, NodeStruct<T>>();
 
-            if (_selfId.Equals(rootId))
-            {
-                _parent = null;
-            }
-            else
-            {
-                _parent = new NodeStruct<T>(rootId);
-                _idToNodeMap[rootId] = _parent;
-            }
+            _parent = _selfId.Equals(rootId) ? null : new 
NodeStruct<T>(rootId);
+
             foreach (var childId in childIds)
             {
-                var node = new NodeStruct<T>(childId);
-                _children.Add(node);
-                _idToNodeMap[childId] = node;
+                _childNodeContainer.PutNode(new NodeStruct<T>(childId));
             }
         }
 
@@ -128,9 +112,9 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                     WaitForTaskRegistration(_parent.Identifier, _retryCount);
                 }
 
-                if (_children.Count > 0)
+                if (_childNodeContainer.Count > 0)
                 {
-                    foreach (var child in _children)
+                    foreach (var child in _childNodeContainer)
                     {
                         WaitForTaskRegistration(child.Identifier, _retryCount);
                     }
@@ -154,24 +138,23 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 throw new ArgumentException("Message must have a source");
             }
 
-            var sourceNode = FindNode(gcm.Source);
+            var sourceNode = (_parent != null && _parent.Identifier == 
gcm.Source) 
+                ? _parent 
+                : _childNodeContainer.GetChild(gcm.Source);
+
             if (sourceNode == null)
             {
                 throw new IllegalStateException("Received message from invalid 
task id: " + gcm.Source);
             }
 
-            lock (_thisLock)
-            {
-                _nodesWithData.Add(sourceNode);
-                var message = gcm as GroupCommunicationMessage<T>;
-
-                if (message == null)
-                {
-                    throw new NullReferenceException("message passed not of 
type GroupCommunicationMessage");
-                }
+            var message = gcm as GroupCommunicationMessage<T>;
 
-                sourceNode.AddData(message);
+            if (message == null)
+            {
+                throw new NullReferenceException("message passed not of type 
GroupCommunicationMessage");
             }
+
+            sourceNode.AddData(message);
         }
 
         /// <summary>
@@ -201,7 +184,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 throw new ArgumentNullException("message");
             }
 
-            foreach (var child in _children)
+            foreach (var child in _childNodeContainer)
             {
                 SendToNode(message, child);
             }
@@ -219,13 +202,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             {
                 throw new ArgumentNullException("messages");
             }
-            if (_children.Count <= 0)
+            if (_childNodeContainer.Count <= 0)
             {
                 return;
             }
 
-            var count = (int)Math.Ceiling(((double)messages.Count) / 
_children.Count);
-            ScatterHelper(messages, _children, count);
+            var numMessagesPerChild = 
(int)Math.Ceiling(((double)messages.Count) / _childNodeContainer.Count);
+            ScatterHelper(messages, _childNodeContainer.ToList(), 
numMessagesPerChild);
         }
 
         /// <summary>
@@ -246,7 +229,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 throw new ArgumentException("Count must be positive");
             }
 
-            ScatterHelper(messages, _children, count);
+            ScatterHelper(messages, _childNodeContainer.ToList(), count);
         }
 
         /// <summary>
@@ -262,7 +245,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             {
                 throw new ArgumentNullException("messages");
             }
-            if (order == null || order.Count != _children.Count)
+            if (order == null || order.Count != _childNodeContainer.Count)
             {
                 throw new ArgumentException("order cannot be null and must 
have the same number of elements as child tasks");
             }
@@ -270,8 +253,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             List<NodeStruct<T>> nodes = new List<NodeStruct<T>>();
             foreach (string taskId in order)
             {
-                NodeStruct<T> node = FindNode(taskId);
-                if (node == null)
+                NodeStruct<T> node;
+                if (!_childNodeContainer.TryGetChild(taskId, out node))
                 {
                     throw new IllegalStateException("Received message from 
invalid task id: " + taskId);
                 }
@@ -279,8 +262,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 nodes.Add(node);
             }
 
-            int count = (int)Math.Ceiling(((double)messages.Count) / 
_children.Count);
-            ScatterHelper(messages, nodes, count);
+            int numMessagesPerChild = 
(int)Math.Ceiling(((double)messages.Count) / _childNodeContainer.Count);
+            ScatterHelper(messages, nodes, numMessagesPerChild);
         }
 
         /// <summary>
@@ -289,7 +272,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// <returns>The parent Task's message</returns>
         public T ReceiveFromParent()
         {
-            T[] data = ReceiveFromNode(_parent);
+            T[] data = _parent.GetData();
             if (data == null || data.Length != 1)
             {
                 throw new InvalidOperationException("Cannot receive data from 
parent node");
@@ -300,7 +283,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 
         public IList<T> ReceiveListFromParent()
         {
-            T[] data = ReceiveFromNode(_parent);
+            T[] data = _parent.GetData();
             if (data == null || data.Length == 0)
             {
                 throw new InvalidOperationException("Cannot receive data from 
parent node");
@@ -322,27 +305,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
                 throw new ArgumentNullException("reduceFunction");
             }
 
-            var receivedData = new List<T>();
-            var childrenToReceiveFrom = new 
HashSet<string>(_children.Select(node => node.Identifier));
-
-            while (childrenToReceiveFrom.Count > 0)
-            {
-                var childrenWithData = GetNodeWithData(childrenToReceiveFrom);
-
-                foreach (var child in childrenWithData)
-                {
-                    T[] data = ReceiveFromNode(child);
-                    if (data == null || data.Length != 1)
-                    {
-                        throw new InvalidOperationException("Received invalid 
data from child with id: " + child.Identifier);
-                    }
-
-                    receivedData.Add(data[0]);
-                    childrenToReceiveFrom.Remove(child.Identifier);
-                }
-            }
-
-            return reduceFunction.Reduce(receivedData);
+            return 
reduceFunction.Reduce(_childNodeContainer.GetDataFromAllChildren());
         }
 
         public void OnError(Exception error)
@@ -355,78 +318,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
 
         public bool HasChildren()
         {
-            return _children.Count > 0;
-        }
-
-        /// <summary>
-        /// Get a set of nodes containing an incoming message and belonging to 
candidate set of nodes.
-        /// </summary>
-        /// <param name="nodeSetIdentifier">Candidate set of nodes from which 
data is to be received</param>
-        /// <returns>A Vector of NodeStruct with incoming data.</returns>
-        private IEnumerable<NodeStruct<T>> GetNodeWithData(IEnumerable<string> 
nodeSetIdentifier)
-        {
-            CancellationTokenSource timeoutSource = new 
CancellationTokenSource(_timeout);
-            List<NodeStruct<T>> nodesSubsetWithData = new 
List<NodeStruct<T>>();
-
-            try
-            {
-                lock (_thisLock)
-                {
-                    foreach (var identifier in nodeSetIdentifier)
-                    {
-                        if (!_idToNodeMap.ContainsKey(identifier))
-                        {
-                            throw new Exception("Trying to get data from the 
node not present in the node map");
-                        }
-
-                        if (_idToNodeMap[identifier].HasMessage())
-                        {
-                            nodesSubsetWithData.Add(_idToNodeMap[identifier]);
-                        }
-                    }
-
-                    if (nodesSubsetWithData.Count > 0)
-                    {
-                        return nodesSubsetWithData;
-                    }
-
-                    while (_nodesWithData.Count != 0)
-                    {
-                        _nodesWithData.Take();
-                    }
-                }
-
-                var potentialNode = _nodesWithData.Take();
-
-                while (!nodeSetIdentifier.Contains(potentialNode.Identifier))
-                {
-                    potentialNode = _nodesWithData.Take();
-                }
-
-                return new NodeStruct<T>[] { potentialNode };
-            }
-            catch (OperationCanceledException)
-            {
-                Logger.Log(Level.Error, "No data to read from child");
-                throw;
-            }
-            catch (ObjectDisposedException)
-            {
-                Logger.Log(Level.Error, "No data to read from child");
-                throw;
-            }
-            catch (InvalidOperationException)
-            {
-                Logger.Log(Level.Error, "No data to read from child");
-                throw;
-            }
+            return _childNodeContainer.Count > 0;
         }
 
         /// <summary>
         /// Sends the message to the Task represented by the given NodeStruct.
         /// </summary>
         /// <param name="message">The message to send</param>
-        /// <param name="msgType">The message type</param>
         /// <param name="node">The NodeStruct representing the Task to send 
to</param>
         private void SendToNode(T message, NodeStruct<T> node)
         {
@@ -440,7 +338,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         /// Sends the list of messages to the Task represented by the given 
NodeStruct.
         /// </summary>
         /// <param name="messages">The list of messages to send</param>
-        /// <param name="msgType">The message type</param>
         /// <param name="node">The NodeStruct representing the Task to send 
to</param>
         private void SendToNode(IList<T> messages, NodeStruct<T> node)
         {
@@ -452,52 +349,29 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             _sender.Send(gcm);
         }
 
-        /// <summary>
-        /// Receive a message from the Task represented by the given 
NodeStruct.
-        /// Removes the NodeStruct from the nodesWithData queue if requested.
-        /// </summary>
-        /// <param name="node">The node to receive from</param>
-        /// <returns>The byte array message from the node</returns>
-        private T[] ReceiveFromNode(NodeStruct<T> node)
+        private void ScatterHelper(IList<T> messages, IList<NodeStruct<T>> 
order, int numMessagesPerChild)
         {
-            var data = node.GetData();
-            return data;
-        }
-
-        /// <summary>
-        /// Find the NodeStruct with the given Task identifier.
-        /// </summary>
-        /// <param name="identifier">The identifier of the Task</param>
-        /// <returns>The NodeStruct</returns>
-        private NodeStruct<T> FindNode(string identifier)
-        {
-            NodeStruct<T> node;
-            return _idToNodeMap.TryGetValue(identifier, out node) ? node : 
null;
-        }
-
-        private void ScatterHelper(IList<T> messages, List<NodeStruct<T>> 
order, int count)
-        {
-            if (count <= 0)
+            if (numMessagesPerChild <= 0)
             {
                 throw new ArgumentException("Count must be positive");
             }
 
-            int i = 0;
+            int numMessagesSent = 0;
             foreach (var nodeStruct in order)
             {
                 // The last sublist might be smaller than count if the number 
of
                 // child tasks is not evenly divisible by count
-                int left = messages.Count - i;
-                int size = (left < count) ? left : count;
-                if (size <= 0)
+                int numMessagesLeft = messages.Count - numMessagesSent;
+                int numMessagesToSend = (numMessagesLeft < 
numMessagesPerChild) ? numMessagesLeft : numMessagesPerChild;
+                if (numMessagesToSend <= 0)
                 {
                     throw new ArgumentException("Scatter count must be 
positive");
                 }
 
-                IList<T> sublist = messages.ToList().GetRange(i, size);
+                IList<T> sublist = messages.ToList().GetRange(numMessagesSent, 
numMessagesToSend);
                 SendToNode(sublist, nodeStruct);
 
-                i += size;
+                numMessagesSent += numMessagesToSend;
             }
         }
 
@@ -511,8 +385,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         {
             for (int i = 0; i < retries; i++)
             {
-                System.Net.IPEndPoint endPoint;
-                if ((endPoint = _nameClient.Lookup(identifier)) != null)
+                if (_nameClient.Lookup(identifier) != null)
                 {
                     return;
                 }

http://git-wip-us.apache.org/repos/asf/reef/blob/d9a39030/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj 
b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index 474f203..6472597 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -86,6 +86,7 @@ under the License.
     <Compile Include="Group\Operators\IOperatorSpec.cs" />
     <Compile Include="Group\Pipelining\StreamingPipelineMessageCodec.cs" />
     <Compile Include="Group\Task\ICommunicationGroupClientInternal.cs" />
+    <Compile Include="Group\Task\Impl\ChildNodeContainer.cs" />
     <Compile Include="Group\Task\IOperatorTopology.cs" />
     <Compile Include="Group\Operators\IReduceFunction.cs" />
     <Compile Include="Group\Operators\IReduceReceiver.cs" />

Reply via email to