Repository: reef Updated Branches: refs/heads/master 528706a0d -> d9a390303
[REEF-1469] Simplify logic in OperatorTopology This addressed the issue by * Replacing logic in GetNodesWithData with synchronous calls to GetData. This is okay because we will need to wait for all children nodes to receive data before proceeding. * Renaming variables for clarity. * Adding ChildNodeContainer to simplify management of children nodes. JIRA: [REEF-1469](https://issues.apache.org/jira/browse/REEF-1469) Pull Request: Closes #1051 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/d9a39030 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/d9a39030 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/d9a39030 Branch: refs/heads/master Commit: d9a390303419f7997f6398054c5df80f40a77829 Parents: 528706a Author: Andrew Chung <[email protected]> Authored: Wed Jun 22 17:18:13 2016 -0700 Committer: dhruv <[email protected]> Committed: Mon Jun 27 16:39:53 2016 -0700 ---------------------------------------------------------------------- .../Group/Task/Impl/ChildNodeContainer.cs | 92 +++++++++ .../Group/Task/Impl/NodeStruct.cs | 17 -- .../Group/Task/Impl/OperatorTopology.cs | 205 ++++--------------- .../Org.Apache.REEF.Network.csproj | 1 + 4 files changed, 132 insertions(+), 183 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/d9a39030/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs new file mode 100644 index 0000000..297212e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/ChildNodeContainer.cs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using Org.Apache.REEF.Utilities.Attributes; + +namespace Org.Apache.REEF.Network.Group.Task.Impl +{ + /// <summary> + /// A container for children nodes in Group Communications. + /// </summary> + [NotThreadSafe] + internal sealed class ChildNodeContainer<T> : IEnumerable<NodeStruct<T>> + { + private readonly Dictionary<string, NodeStruct<T>> _childIdToNodeMap = + new Dictionary<string, NodeStruct<T>>(); + + /// <summary> + /// Gets the number of children. + /// </summary> + public int Count + { + get { return _childIdToNodeMap.Count; } + } + + /// <summary> + /// Puts the child node into the container. + /// </summary> + public void PutNode(NodeStruct<T> childNode) + { + _childIdToNodeMap.Add(childNode.Identifier, childNode); + } + + /// <summary> + /// Gets the child with the specified identifier. + /// </summary> + public bool TryGetChild(string identifier, out NodeStruct<T> child) + { + return _childIdToNodeMap.TryGetValue(identifier, out child); + } + + /// <summary> + /// Gets the child with the specified identifier. Returns null if child does not exist. + /// </summary> + public NodeStruct<T> GetChild(string identifier) + { + NodeStruct<T> child; + TryGetChild(identifier, out child); + return child; + } + + /// <summary> + /// Gets the data from all children nodes synchronously. + /// </summary> + public IEnumerable<T> GetDataFromAllChildren() + { + return this.SelectMany(child => child.GetData()); + } + + /// <summary> + /// Gets an Enumerator for iterating through children nodes. + /// </summary> + public IEnumerator<NodeStruct<T>> GetEnumerator() + { + return _childIdToNodeMap.Values.GetEnumerator(); + } + + /// <summary> + /// Gets an Enumerator for iterating through children nodes. + /// </summary> + IEnumerator IEnumerable.GetEnumerator() + { + return GetEnumerator(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/d9a39030/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs index a03fd40..e13d724 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -using System; using System.Collections.Concurrent; using Org.Apache.REEF.Network.Group.Driver.Impl; @@ -63,21 +62,5 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { _messageQueue.Add(gcm); } - - /// <summary> - /// Tells whether there is a message in queue or not. - /// </summary> - /// <returns>True if queue is non empty, false otherwise.</returns> - internal bool HasMessage() - { - if (_messageQueue.Count != 0) - { - return true; - } - else - { - return false; - } - } } } http://git-wip-us.apache.org/repos/asf/reef/blob/d9a39030/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs index caf831a..bf63af4 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs @@ -16,7 +16,6 @@ // under the License. using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -51,13 +50,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl private readonly int _retryCount; private readonly int _sleepTime; + private readonly ChildNodeContainer<T> _childNodeContainer = new ChildNodeContainer<T>(); private readonly NodeStruct<T> _parent; - private readonly List<NodeStruct<T>> _children; - private readonly Dictionary<string, NodeStruct<T>> _idToNodeMap; private readonly INameClient _nameClient; private readonly Sender _sender; - private readonly BlockingCollection<NodeStruct<T>> _nodesWithData; - private readonly object _thisLock = new object(); /// <summary> /// Creates a new OperatorTopology object. @@ -93,24 +89,12 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl _sleepTime = sleepTime; _nameClient = networkService.NamingClient; _sender = sender; - _nodesWithData = new BlockingCollection<NodeStruct<T>>(); - _children = new List<NodeStruct<T>>(); - _idToNodeMap = new Dictionary<string, NodeStruct<T>>(); - if (_selfId.Equals(rootId)) - { - _parent = null; - } - else - { - _parent = new NodeStruct<T>(rootId); - _idToNodeMap[rootId] = _parent; - } + _parent = _selfId.Equals(rootId) ? null : new NodeStruct<T>(rootId); + foreach (var childId in childIds) { - var node = new NodeStruct<T>(childId); - _children.Add(node); - _idToNodeMap[childId] = node; + _childNodeContainer.PutNode(new NodeStruct<T>(childId)); } } @@ -128,9 +112,9 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl WaitForTaskRegistration(_parent.Identifier, _retryCount); } - if (_children.Count > 0) + if (_childNodeContainer.Count > 0) { - foreach (var child in _children) + foreach (var child in _childNodeContainer) { WaitForTaskRegistration(child.Identifier, _retryCount); } @@ -154,24 +138,23 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl throw new ArgumentException("Message must have a source"); } - var sourceNode = FindNode(gcm.Source); + var sourceNode = (_parent != null && _parent.Identifier == gcm.Source) + ? _parent + : _childNodeContainer.GetChild(gcm.Source); + if (sourceNode == null) { throw new IllegalStateException("Received message from invalid task id: " + gcm.Source); } - lock (_thisLock) - { - _nodesWithData.Add(sourceNode); - var message = gcm as GroupCommunicationMessage<T>; - - if (message == null) - { - throw new NullReferenceException("message passed not of type GroupCommunicationMessage"); - } + var message = gcm as GroupCommunicationMessage<T>; - sourceNode.AddData(message); + if (message == null) + { + throw new NullReferenceException("message passed not of type GroupCommunicationMessage"); } + + sourceNode.AddData(message); } /// <summary> @@ -201,7 +184,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl throw new ArgumentNullException("message"); } - foreach (var child in _children) + foreach (var child in _childNodeContainer) { SendToNode(message, child); } @@ -219,13 +202,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { throw new ArgumentNullException("messages"); } - if (_children.Count <= 0) + if (_childNodeContainer.Count <= 0) { return; } - var count = (int)Math.Ceiling(((double)messages.Count) / _children.Count); - ScatterHelper(messages, _children, count); + var numMessagesPerChild = (int)Math.Ceiling(((double)messages.Count) / _childNodeContainer.Count); + ScatterHelper(messages, _childNodeContainer.ToList(), numMessagesPerChild); } /// <summary> @@ -246,7 +229,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl throw new ArgumentException("Count must be positive"); } - ScatterHelper(messages, _children, count); + ScatterHelper(messages, _childNodeContainer.ToList(), count); } /// <summary> @@ -262,7 +245,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { throw new ArgumentNullException("messages"); } - if (order == null || order.Count != _children.Count) + if (order == null || order.Count != _childNodeContainer.Count) { throw new ArgumentException("order cannot be null and must have the same number of elements as child tasks"); } @@ -270,8 +253,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl List<NodeStruct<T>> nodes = new List<NodeStruct<T>>(); foreach (string taskId in order) { - NodeStruct<T> node = FindNode(taskId); - if (node == null) + NodeStruct<T> node; + if (!_childNodeContainer.TryGetChild(taskId, out node)) { throw new IllegalStateException("Received message from invalid task id: " + taskId); } @@ -279,8 +262,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl nodes.Add(node); } - int count = (int)Math.Ceiling(((double)messages.Count) / _children.Count); - ScatterHelper(messages, nodes, count); + int numMessagesPerChild = (int)Math.Ceiling(((double)messages.Count) / _childNodeContainer.Count); + ScatterHelper(messages, nodes, numMessagesPerChild); } /// <summary> @@ -289,7 +272,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <returns>The parent Task's message</returns> public T ReceiveFromParent() { - T[] data = ReceiveFromNode(_parent); + T[] data = _parent.GetData(); if (data == null || data.Length != 1) { throw new InvalidOperationException("Cannot receive data from parent node"); @@ -300,7 +283,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl public IList<T> ReceiveListFromParent() { - T[] data = ReceiveFromNode(_parent); + T[] data = _parent.GetData(); if (data == null || data.Length == 0) { throw new InvalidOperationException("Cannot receive data from parent node"); @@ -322,27 +305,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl throw new ArgumentNullException("reduceFunction"); } - var receivedData = new List<T>(); - var childrenToReceiveFrom = new HashSet<string>(_children.Select(node => node.Identifier)); - - while (childrenToReceiveFrom.Count > 0) - { - var childrenWithData = GetNodeWithData(childrenToReceiveFrom); - - foreach (var child in childrenWithData) - { - T[] data = ReceiveFromNode(child); - if (data == null || data.Length != 1) - { - throw new InvalidOperationException("Received invalid data from child with id: " + child.Identifier); - } - - receivedData.Add(data[0]); - childrenToReceiveFrom.Remove(child.Identifier); - } - } - - return reduceFunction.Reduce(receivedData); + return reduceFunction.Reduce(_childNodeContainer.GetDataFromAllChildren()); } public void OnError(Exception error) @@ -355,78 +318,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl public bool HasChildren() { - return _children.Count > 0; - } - - /// <summary> - /// Get a set of nodes containing an incoming message and belonging to candidate set of nodes. - /// </summary> - /// <param name="nodeSetIdentifier">Candidate set of nodes from which data is to be received</param> - /// <returns>A Vector of NodeStruct with incoming data.</returns> - private IEnumerable<NodeStruct<T>> GetNodeWithData(IEnumerable<string> nodeSetIdentifier) - { - CancellationTokenSource timeoutSource = new CancellationTokenSource(_timeout); - List<NodeStruct<T>> nodesSubsetWithData = new List<NodeStruct<T>>(); - - try - { - lock (_thisLock) - { - foreach (var identifier in nodeSetIdentifier) - { - if (!_idToNodeMap.ContainsKey(identifier)) - { - throw new Exception("Trying to get data from the node not present in the node map"); - } - - if (_idToNodeMap[identifier].HasMessage()) - { - nodesSubsetWithData.Add(_idToNodeMap[identifier]); - } - } - - if (nodesSubsetWithData.Count > 0) - { - return nodesSubsetWithData; - } - - while (_nodesWithData.Count != 0) - { - _nodesWithData.Take(); - } - } - - var potentialNode = _nodesWithData.Take(); - - while (!nodeSetIdentifier.Contains(potentialNode.Identifier)) - { - potentialNode = _nodesWithData.Take(); - } - - return new NodeStruct<T>[] { potentialNode }; - } - catch (OperationCanceledException) - { - Logger.Log(Level.Error, "No data to read from child"); - throw; - } - catch (ObjectDisposedException) - { - Logger.Log(Level.Error, "No data to read from child"); - throw; - } - catch (InvalidOperationException) - { - Logger.Log(Level.Error, "No data to read from child"); - throw; - } + return _childNodeContainer.Count > 0; } /// <summary> /// Sends the message to the Task represented by the given NodeStruct. /// </summary> /// <param name="message">The message to send</param> - /// <param name="msgType">The message type</param> /// <param name="node">The NodeStruct representing the Task to send to</param> private void SendToNode(T message, NodeStruct<T> node) { @@ -440,7 +338,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Sends the list of messages to the Task represented by the given NodeStruct. /// </summary> /// <param name="messages">The list of messages to send</param> - /// <param name="msgType">The message type</param> /// <param name="node">The NodeStruct representing the Task to send to</param> private void SendToNode(IList<T> messages, NodeStruct<T> node) { @@ -452,52 +349,29 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl _sender.Send(gcm); } - /// <summary> - /// Receive a message from the Task represented by the given NodeStruct. - /// Removes the NodeStruct from the nodesWithData queue if requested. - /// </summary> - /// <param name="node">The node to receive from</param> - /// <returns>The byte array message from the node</returns> - private T[] ReceiveFromNode(NodeStruct<T> node) + private void ScatterHelper(IList<T> messages, IList<NodeStruct<T>> order, int numMessagesPerChild) { - var data = node.GetData(); - return data; - } - - /// <summary> - /// Find the NodeStruct with the given Task identifier. - /// </summary> - /// <param name="identifier">The identifier of the Task</param> - /// <returns>The NodeStruct</returns> - private NodeStruct<T> FindNode(string identifier) - { - NodeStruct<T> node; - return _idToNodeMap.TryGetValue(identifier, out node) ? node : null; - } - - private void ScatterHelper(IList<T> messages, List<NodeStruct<T>> order, int count) - { - if (count <= 0) + if (numMessagesPerChild <= 0) { throw new ArgumentException("Count must be positive"); } - int i = 0; + int numMessagesSent = 0; foreach (var nodeStruct in order) { // The last sublist might be smaller than count if the number of // child tasks is not evenly divisible by count - int left = messages.Count - i; - int size = (left < count) ? left : count; - if (size <= 0) + int numMessagesLeft = messages.Count - numMessagesSent; + int numMessagesToSend = (numMessagesLeft < numMessagesPerChild) ? numMessagesLeft : numMessagesPerChild; + if (numMessagesToSend <= 0) { throw new ArgumentException("Scatter count must be positive"); } - IList<T> sublist = messages.ToList().GetRange(i, size); + IList<T> sublist = messages.ToList().GetRange(numMessagesSent, numMessagesToSend); SendToNode(sublist, nodeStruct); - i += size; + numMessagesSent += numMessagesToSend; } } @@ -511,8 +385,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { for (int i = 0; i < retries; i++) { - System.Net.IPEndPoint endPoint; - if ((endPoint = _nameClient.Lookup(identifier)) != null) + if (_nameClient.Lookup(identifier) != null) { return; } http://git-wip-us.apache.org/repos/asf/reef/blob/d9a39030/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj index 474f203..6472597 100644 --- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj +++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj @@ -86,6 +86,7 @@ under the License. <Compile Include="Group\Operators\IOperatorSpec.cs" /> <Compile Include="Group\Pipelining\StreamingPipelineMessageCodec.cs" /> <Compile Include="Group\Task\ICommunicationGroupClientInternal.cs" /> + <Compile Include="Group\Task\Impl\ChildNodeContainer.cs" /> <Compile Include="Group\Task\IOperatorTopology.cs" /> <Compile Include="Group\Operators\IReduceFunction.cs" /> <Compile Include="Group\Operators\IReduceReceiver.cs" />
