[REEF-166] Adding tree topology for group communication This PR is to add tree topology for group communication * add tree Topology * Updated Group communication driver code to use it * Update broadcast operator for tree typology * Add test cases for broadcast using tree topology
JIRA: Reef-166. (https://issues.apache.org/jira/browse/REEF-166) Author: Julia Wang Email: [email protected] This closes #90 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/aa36ade1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/aa36ade1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/aa36ade1 Branch: refs/heads/master Commit: aa36ade1eba18eafe928fdddf2a70265a01adc3c Parents: 6ce4f41 Author: Julia Wang <[email protected]> Authored: Thu Feb 19 10:46:12 2015 -0800 Committer: tmajest <[email protected]> Committed: Thu Feb 26 11:50:37 2015 -0800 ---------------------------------------------------------------------- .../KMeans/KMeansDriverHandlers.cs | 3 +- .../Group/Config/MpiConfigurationOptions.cs | 15 + .../Group/Driver/ICommunicationGroupDriver.cs | 8 +- .../Driver/Impl/CommunicationGroupDriver.cs | 61 +- .../Group/Driver/Impl/MpiDriver.cs | 5 +- .../Group/Operators/IReduceSender.cs | 2 +- .../Group/Operators/Impl/BroadcastReceiver.cs | 7 +- .../Group/Operators/Impl/ReduceSender.cs | 30 +- .../Group/Operators/Impl/ScatterReceiver.cs | 4 +- .../Group/Task/Impl/OperatorTopology.cs | 36 +- .../Group/Topology/FlatTopology.cs | 23 +- .../Group/Topology/TaskNode.cs | 35 +- .../Group/Topology/TopologyTypes.cs | 27 + .../Group/Topology/TreeTopology.cs | 233 +++++ .../Org.Apache.REEF.Network.csproj | 2 + .../BroadcastReduceDriver.cs | 2 + .../BroadcastReduceTest/BroadcastReduceTest.cs | 3 + .../Functional/MPI/MpiTestConfig.cs | 5 + .../Functional/MPI/MpiTestConstants.cs | 1 + .../MPI/ScatterReduceTest/MasterTask.cs | 3 +- .../ScatterReduceTest/ScatterReduceDriver.cs | 6 +- .../MPI/ScatterReduceTest/ScatterReduceTest.cs | 3 + .../Network/GroupCommunicationTests.cs | 100 +- .../GroupCommunicationTreeTopologyTests.cs | 931 +++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 9 + 25 files changed, 1430 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs index 66c16bd..4920782 100644 --- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs +++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs @@ -58,6 +58,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans // TODO: we may want to make this injectable private readonly int _partitionsNumber = 2; private readonly int _clustersNumber = 3; + private readonly int _fanOut = 2; private readonly int _totalEvaluators; private int _partitionInex = 0; private readonly IMpiDriver _mpiDriver; @@ -78,7 +79,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans _executionDirectory); _totalEvaluators = _partitionsNumber + 1; - _mpiDriver = new MpiDriver(Identifier, Constants.MasterTaskId, new AvroConfigurationSerializer()); + _mpiDriver = new MpiDriver(Identifier, Constants.MasterTaskId, _fanOut, new AvroConfigurationSerializer()); _commGroup = _mpiDriver.NewCommunicationGroup( Constants.KMeansCommunicationGroupName, http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs index db96205..bdef88b 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs @@ -39,11 +39,26 @@ namespace Org.Apache.REEF.Network.Group.Config { } + [NamedParameter("Timeout for receiving data", defaultValue: "50000")] + public class Timeout : Name<int> + { + } + + [NamedParameter("Retry times", defaultValue: "5")] + public class RetryCount : Name<int> + { + } + [NamedParameter("Master task identifier")] public class MasterTaskId : Name<string> { } + [NamedParameter("with of the tree in topology", defaultValue:"2")] + public class FanOut : Name<int> + { + } + [NamedParameter("Serialized communication group configuration")] public class SerializedGroupConfigs : Name<ISet<string>> { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs index 0d41209..22caebd 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs @@ -19,6 +19,7 @@ using System.Collections.Generic; using Org.Apache.REEF.Network.Group.Operators.Impl; +using Org.Apache.REEF.Network.Group.Topology; using Org.Apache.REEF.Tang.Interface; namespace Org.Apache.REEF.Network.Group.Driver @@ -42,7 +43,7 @@ namespace Org.Apache.REEF.Network.Group.Driver /// <param name="operatorName">The name of the broadcast operator</param> /// <param name="spec">The specification that defines the Broadcast operator</param> /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns> - ICommunicationGroupDriver AddBroadcast<T>(string operatorName, BroadcastOperatorSpec<T> spec); + ICommunicationGroupDriver AddBroadcast<T>(string operatorName, BroadcastOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat); /// <summary> /// Adds the Reduce MPI operator to the communication group. @@ -51,7 +52,7 @@ namespace Org.Apache.REEF.Network.Group.Driver /// <param name="operatorName">The name of the reduce operator</param> /// <param name="spec">The specification that defines the Reduce operator</param> /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns> - ICommunicationGroupDriver AddReduce<T>(string operatorName, ReduceOperatorSpec<T> spec); + ICommunicationGroupDriver AddReduce<T>(string operatorName, ReduceOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat); /// <summary> /// Adds the Scatter MPI operator to the communication group. @@ -59,8 +60,9 @@ namespace Org.Apache.REEF.Network.Group.Driver /// <typeparam name="T">The type of messages that operators will send</typeparam> /// <param name="operatorName">The name of the scatter operator</param> /// <param name="spec">The specification that defines the Scatter operator</param> + /// <param name="topologyType">type of topology used in the operaor</param> /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns> - ICommunicationGroupDriver AddScatter<T>(string operatorName, ScatterOperatorSpec<T> spec); + ICommunicationGroupDriver AddScatter<T>(string operatorName, ScatterOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat); /// <summary> /// Finalizes the CommunicationGroupDriver. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs index 5b57d9e..0b426a6 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs @@ -45,6 +45,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl private readonly int _numTasks; private int _tasksAdded; private bool _finalized; + private readonly int _fanOut; private readonly AvroConfigurationSerializer _confSerializer; @@ -61,8 +62,9 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <param name="confSerializer">Used to serialize task configuration</param> public CommunicationGroupDriver( string groupName, - string driverId, + string driverId, int numTasks, + int fanOut, AvroConfigurationSerializer confSerializer) { _confSerializer = confSerializer; @@ -71,6 +73,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl _numTasks = numTasks; _tasksAdded = 0; _finalized = false; + _fanOut = fanOut; _topologyLock = new object(); @@ -82,7 +85,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <summary> /// Returns the list of task ids that belong to this Communication Group /// </summary> - public List<string> TaskIds { get; private set; } + public List<string> TaskIds { get; private set; } /// <summary> /// Adds the Broadcast MPI operator to the communication group. @@ -92,15 +95,26 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <param name="spec">The specification that defines the Broadcast operator</param> /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns> public ICommunicationGroupDriver AddBroadcast<T>( - string operatorName, - BroadcastOperatorSpec<T> spec) + string operatorName, + BroadcastOperatorSpec<T> spec, + TopologyTypes topologyType = TopologyTypes.Flat) { if (_finalized) { throw new IllegalStateException("Can't add operators once the spec has been built."); } - ITopology<T> topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec); + ITopology<T> topology; + + if (topologyType == TopologyTypes.Flat) + { + topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec); + } + else + { + topology = new TreeTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec, + _fanOut); + } _topologies[operatorName] = topology; _operatorSpecs[operatorName] = spec; @@ -115,15 +129,26 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <param name="spec">The specification that defines the Reduce operator</param> /// <returns>The same CommunicationGroupDriver with the added Reduce operator info</returns> public ICommunicationGroupDriver AddReduce<T>( - string operatorName, - ReduceOperatorSpec<T> spec) + string operatorName, + ReduceOperatorSpec<T> spec, + TopologyTypes topologyType = TopologyTypes.Flat) { if (_finalized) { throw new IllegalStateException("Can't add operators once the spec has been built."); } - ITopology<T> topology = new FlatTopology<T>(operatorName, _groupName, spec.ReceiverId, _driverId, spec); + ITopology<T> topology; + + if (topologyType == TopologyTypes.Flat) + { + topology = new FlatTopology<T>(operatorName, _groupName, spec.ReceiverId, _driverId, spec); + } + else + { + topology = new TreeTopology<T>(operatorName, _groupName, spec.ReceiverId, _driverId, spec, + _fanOut); + } _topologies[operatorName] = topology; _operatorSpecs[operatorName] = spec; @@ -137,14 +162,24 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <param name="operatorName">The name of the scatter operator</param> /// <param name="spec">The specification that defines the Scatter operator</param> /// <returns>The same CommunicationGroupDriver with the added Scatter operator info</returns> - public ICommunicationGroupDriver AddScatter<T>(string operatorName, ScatterOperatorSpec<T> spec) + public ICommunicationGroupDriver AddScatter<T>(string operatorName, ScatterOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat) { if (_finalized) { throw new IllegalStateException("Can't add operators once the spec has been built."); } - ITopology<T> topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec); + ITopology<T> topology; + + if (topologyType == TopologyTypes.Flat) + { + topology = new FlatTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec); + } + else + { + topology = new TreeTopology<T>(operatorName, _groupName, spec.SenderId, _driverId, spec, + _fanOut); + } _topologies[operatorName] = topology; _operatorSpecs[operatorName] = spec; @@ -245,14 +280,14 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl { var topology = _topologies[operatorName]; MethodInfo info = topology.GetType().GetMethod("AddTask"); - info.Invoke(topology, new[] { (object) taskId }); + info.Invoke(topology, new[] { (object)taskId }); } private IConfiguration GetOperatorConfiguration(string operatorName, string taskId) { var topology = _topologies[operatorName]; MethodInfo info = topology.GetType().GetMethod("GetTaskConfiguration"); - return (IConfiguration) info.Invoke(topology, new[] { (object) taskId }); + return (IConfiguration)info.Invoke(topology, new[] { (object)taskId }); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs index 52eec91..40ca02c 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs @@ -55,6 +55,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl private readonly string _nameServerAddr; private readonly int _nameServerPort; private int _contextIds; + private int _fanOut; private readonly Dictionary<string, ICommunicationGroupDriver> _commGroups; private readonly AvroConfigurationSerializer _configSerializer; @@ -70,10 +71,12 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl public MpiDriver( [Parameter(typeof(MpiConfigurationOptions.DriverId))] string driverId, [Parameter(typeof(MpiConfigurationOptions.MasterTaskId))] string masterTaskId, + [Parameter(typeof(MpiConfigurationOptions.FanOut))] int fanOut, AvroConfigurationSerializer configSerializer) { _driverId = driverId; _contextIds = -1; + _fanOut = fanOut; MasterTaskId = masterTaskId; _configSerializer = configSerializer; @@ -111,7 +114,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl throw new ArgumentException("Group Name already registered with MpiDriver"); } - var commGroup = new CommunicationGroupDriver(groupName, _driverId, numTasks, _configSerializer); + var commGroup = new CommunicationGroupDriver(groupName, _driverId, numTasks, _fanOut, _configSerializer); _commGroups[groupName] = commGroup; return commGroup; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs index e7ebf3c..97990bd 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs @@ -26,7 +26,7 @@ namespace Org.Apache.REEF.Network.Group.Operators public interface IReduceSender<T> : IMpiOperator<T> { /// <summary> - /// Sends data to the operator's ReduceReceiver to be aggregated. + /// Get reduced data from children, reduce with the data given, then sends reduced data to parent /// </summary> /// <param name="data">The data to send</param> void Send(T data); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs index 92821c8..d45fa24 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs @@ -85,7 +85,12 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <returns>The incoming message</returns> public T Receive() { - return _topology.ReceiveFromParent(); + var data = _topology.ReceiveFromParent(); + if (_topology.HasChildren()) + { + _topology.SendToChildren(data, MessageType.Data); + } + return data; } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs index 1c27759..a2c7d93 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs @@ -18,6 +18,7 @@ */ using System; +using System.Collections.Generic; using System.Reactive; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver.Impl; @@ -37,6 +38,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl private readonly ICommunicationGroupNetworkObserver _networkHandler; private readonly OperatorTopology<T> _topology; + private readonly IReduceFunction<T> _reduceFunction; /// <summary> /// Creates a new ReduceSender. @@ -50,12 +52,13 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string operatorName, [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, OperatorTopology<T> topology, - ICommunicationGroupNetworkObserver networkHandler) + ICommunicationGroupNetworkObserver networkHandler, + IReduceFunction<T> reduceFunction) { OperatorName = operatorName; GroupName = groupName; Version = DefaultVersion; - + _reduceFunction = reduceFunction; _networkHandler = networkHandler; _topology = topology; _topology.Initialize(); @@ -80,7 +83,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl public int Version { get; private set; } /// <summary> - /// Sends the data to the operator's ReduceReceiver to be aggregated. + /// Sends data to the operator's ReduceReceiver to be aggregated. /// </summary> /// <param name="data">The data to send</param> public void Send(T data) @@ -90,7 +93,26 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl throw new ArgumentNullException("data"); } - _topology.SendToParent(data, MessageType.Data); + //middle notes + if (_topology.HasChildren()) + { + var reducedValueOfChildren = _topology.ReceiveFromChildren(_reduceFunction); + + var mergeddData = new List<T>(); + mergeddData.Add(data); + if (reducedValueOfChildren != null) + { + mergeddData.Add(reducedValueOfChildren); + } + T reducedValue = _reduceFunction.Reduce(mergeddData); + + _topology.SendToParent(reducedValue, MessageType.Data); + } + else + { + //leaf node + _topology.SendToParent(data, MessageType.Data); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs index 8037f6f..6a9207d 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs @@ -91,7 +91,9 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <returns>The sublist of messages</returns> public List<T> Receive() { - return _topology.ReceiveListFromParent(); + List<T> elements = _topology.ReceiveListFromParent(); + _topology.ScatterToChildren(elements, MessageType.Data); + return elements; } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs index d8e732f..fcec282 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs @@ -45,15 +45,14 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <typeparam name="T">The message type</typeparam> public class OperatorTopology<T> : IObserver<GroupCommunicationMessage> { - private const int DefaultTimeout = 10000; - private const int RetryCount = 5; - private static readonly Logger LOGGER = Logger.GetLogger(typeof(OperatorTopology<>)); private readonly string _groupName; private readonly string _operatorName; private readonly string _selfId; private string _driverId; + private readonly int _timeout; + private readonly int _retryCount; private readonly NodeStruct _parent; private readonly List<NodeStruct> _children; @@ -81,6 +80,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl [Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string groupName, [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, [Parameter(typeof(MpiConfigurationOptions.DriverId))] string driverId, + [Parameter(typeof(MpiConfigurationOptions.Timeout))] int timrout, + [Parameter(typeof(MpiConfigurationOptions.RetryCount))] int retryCount, [Parameter(typeof(MpiConfigurationOptions.TopologyRootTaskId))] string rootId, [Parameter(typeof(MpiConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds, NetworkService<GroupCommunicationMessage> networkService, @@ -91,6 +92,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl _groupName = groupName; _selfId = taskId; _driverId = driverId; + _timeout = timrout; + _retryCount = retryCount; _codec = codec; _nameClient = networkService.NamingClient; _sender = sender; @@ -101,18 +104,18 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl if (_selfId.Equals(rootId)) { _parent = null; - foreach (string childId in childIds) - { - NodeStruct node = new NodeStruct(childId); - _children.Add(node); - _idToNodeMap[childId] = node; - } } else { _parent = new NodeStruct(rootId); _idToNodeMap[rootId] = _parent; } + foreach (string childId in childIds) + { + NodeStruct node = new NodeStruct(childId); + _children.Add(node); + _idToNodeMap[childId] = node; + } } /// <summary> @@ -126,14 +129,14 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { if (_parent != null) { - WaitForTaskRegistration(_parent.Identifier, RetryCount); + WaitForTaskRegistration(_parent.Identifier, _retryCount); } if (_children.Count > 0) { foreach (NodeStruct child in _children) { - WaitForTaskRegistration(child.Identifier, RetryCount); + WaitForTaskRegistration(child.Identifier, _retryCount); } } } @@ -212,7 +215,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl } if (_children.Count <= 0) { - throw new ArgumentException("Cannot scatter, no children available"); + return; } int count = (int) Math.Ceiling(((double) messages.Count) / _children.Count); @@ -344,13 +347,18 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { } + public bool HasChildren() + { + return _children.Count > 0; + } + /// <summary> /// Get a node containing an incoming message. /// </summary> /// <returns>A NodeStruct with incoming data.</returns> private NodeStruct GetNodeWithData() { - CancellationTokenSource timeoutSource = new CancellationTokenSource(DefaultTimeout); + CancellationTokenSource timeoutSource = new CancellationTokenSource(_timeout); try { @@ -480,4 +488,4 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl throw new IllegalStateException("Failed to initialize operator topology for node: " + identifier); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs index 39b5a99..892c82c 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs @@ -88,16 +88,19 @@ namespace Org.Apache.REEF.Network.Group.Topology GenericType<MpiConfigurationOptions.TopologyRootTaskId>.Class, _rootId); - foreach (string tId in _nodes.Keys) + if (taskId.Equals(_rootId)) { - if (!tId.Equals(_rootId)) + foreach (string tId in _nodes.Keys) { - confBuilder.BindSetEntry<MpiConfigurationOptions.TopologyChildTaskIds, string>( - GenericType<MpiConfigurationOptions.TopologyChildTaskIds>.Class, - tId); + if (!tId.Equals(_rootId)) + { + confBuilder.BindSetEntry<MpiConfigurationOptions.TopologyChildTaskIds, string>( + GenericType<MpiConfigurationOptions.TopologyChildTaskIds>.Class, + tId); + } } } - + if (OperatorSpec is BroadcastOperatorSpec<T>) { BroadcastOperatorSpec<T> broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T>; @@ -171,25 +174,25 @@ namespace Org.Apache.REEF.Network.Group.Topology private void SetRootNode(string rootId) { - TaskNode rootNode = new TaskNode(_groupName, _operatorName, rootId, _driverId); + TaskNode rootNode = new TaskNode(_groupName, _operatorName, rootId, _driverId, true); _root = rootNode; foreach (TaskNode childNode in _nodes.Values) { rootNode.AddChild(childNode); - childNode.SetParent(rootNode); + childNode.Parent = rootNode; } } private void AddChild(string childId) { - TaskNode childNode = new TaskNode(_groupName, _operatorName, childId, _driverId); + TaskNode childNode = new TaskNode(_groupName, _operatorName, childId, _driverId, false); _nodes[childId] = childNode; if (_root != null) { _root.AddChild(childNode); - childNode.SetParent(_root); + childNode.Parent = _root; } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs index 65cdb5e..fe010a8 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs @@ -32,32 +32,57 @@ namespace Org.Apache.REEF.Network.Group.Topology private string _driverId; private TaskNode _parent; + private TaskNode _successor; + private bool _isRoot; + private readonly List<TaskNode> _children; public TaskNode( string groupName, string operatorName, string taskId, - string driverId) + string driverId, + bool isRoot) { _groupName = groupName; _operatorName = operatorName; _taskId = taskId; _driverId = driverId; + _isRoot = isRoot; _children = new List<TaskNode>(); } - public string TaskId { get; private set; } + public string TaskId + { + get { return _taskId; } + private set { _taskId = value; } + } + + public TaskNode Successor + { + get { return _successor; } + set { _successor = value; } + } + + public TaskNode Parent + { + get { return _parent; } + set { _parent = value; } + } public void AddChild(TaskNode child) { _children.Add(child); } - public void SetParent(TaskNode parent) - { - _parent = parent; + public int GetNumberOfChildren() { + return _children.Count; } + + public IList<TaskNode> GetChildren() + { + return _children; + } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Topology/TopologyTypes.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TopologyTypes.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TopologyTypes.cs new file mode 100644 index 0000000..fdf2c52 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TopologyTypes.cs @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Network.Group.Topology +{ + public enum TopologyTypes + { + Flat = 0, + Tree = 1 + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs new file mode 100644 index 0000000..50b2636 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Operators.Impl; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.Group.Topology +{ + public class TreeTopology<T> : ITopology<T> + { + private readonly string _groupName; + private readonly string _operatorName; + + private readonly string _rootId; + private readonly string _driverId; + + private readonly Dictionary<string, TaskNode> _nodes; + private TaskNode _root; + private TaskNode _logicalRoot; + private TaskNode _prev; + + private int _fanOut; + + /// <summary> + /// Creates a new TreeTopology. + /// </summary> + /// <param name="operatorName">The operator name</param> + /// <param name="groupName">The name of the topology's CommunicationGroup</param> + /// <param name="rootId">The root Task identifier</param> + /// <param name="driverId">The driver identifier</param> + /// <param name="operatorSpec">The operator specification</param> + /// <param name="fanOut">The number of chldren for a tree node</param> + public TreeTopology( + string operatorName, + string groupName, + string rootId, + string driverId, + IOperatorSpec<T> operatorSpec, + int fanOut) + { + _groupName = groupName; + _operatorName = operatorName; + _rootId = rootId; + _driverId = driverId; + + OperatorSpec = operatorSpec; + _fanOut = fanOut; + + _nodes = new Dictionary<string, TaskNode>(); + } + + public IOperatorSpec<T> OperatorSpec { get; set; } + + /// <summary> + /// Gets the task configuration for the operator topology. + /// </summary> + /// <param name="taskId">The task identifier</param> + /// <returns>The task configuration</returns> + public IConfiguration GetTaskConfiguration(string taskId) + { + if (taskId == null) + { + throw new ArgumentException("TaskId is null when GetTaskConfiguration"); + } + + TaskNode selfTaskNode = GetTaskNode(taskId); + if (selfTaskNode == null) + { + throw new ArgumentException("Task has not been added to the topology"); + } + + string parentId; + TaskNode parent = selfTaskNode.Parent; + if (parent == null) + { + parentId = selfTaskNode.TaskId; + } + else + { + parentId = parent.TaskId; + } + + //add parentid, if no parent, add itself + var confBuilder = TangFactory.GetTang().NewConfigurationBuilder() + .BindImplementation(typeof(ICodec<T>), OperatorSpec.Codec.GetType()) + .BindNamedParameter<MpiConfigurationOptions.TopologyRootTaskId, string>( + GenericType<MpiConfigurationOptions.TopologyRootTaskId>.Class, + parentId); + + //add all its children + foreach (TaskNode childNode in selfTaskNode.GetChildren()) + { + confBuilder.BindSetEntry<MpiConfigurationOptions.TopologyChildTaskIds, string>( + GenericType<MpiConfigurationOptions.TopologyChildTaskIds>.Class, + childNode.TaskId); + } + + if (OperatorSpec is BroadcastOperatorSpec<T>) + { + BroadcastOperatorSpec<T> broadcastSpec = OperatorSpec as BroadcastOperatorSpec<T>; + if (taskId.Equals(broadcastSpec.SenderId)) + { + confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<BroadcastSender<T>>.Class); + } + else + { + confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<BroadcastReceiver<T>>.Class); + } + } + else if (OperatorSpec is ReduceOperatorSpec<T>) + { + ReduceOperatorSpec<T> reduceSpec = OperatorSpec as ReduceOperatorSpec<T>; + confBuilder.BindImplementation(typeof(IReduceFunction<T>), reduceSpec.ReduceFunction.GetType()); + + if (taskId.Equals(reduceSpec.ReceiverId)) + { + confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ReduceReceiver<T>>.Class); + } + else + { + confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ReduceSender<T>>.Class); + } + } + else if (OperatorSpec is ScatterOperatorSpec<T>) + { + ScatterOperatorSpec<T> scatterSpec = OperatorSpec as ScatterOperatorSpec<T>; + if (taskId.Equals(scatterSpec.SenderId)) + { + confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ScatterSender<T>>.Class); + } + else + { + confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, GenericType<ScatterReceiver<T>>.Class); + } + } + else + { + throw new NotSupportedException("Spec type not supported"); + } + + return confBuilder.Build(); + } + + public void AddTask(string taskId) + { + if (string.IsNullOrEmpty(taskId)) + { + throw new ArgumentNullException("taskId"); + } + if (_nodes.ContainsKey(taskId)) + { + throw new ArgumentException("Task has already been added to the topology"); + } + + if (taskId.Equals(_rootId)) + { + SetRootNode(_rootId); + } + else + { + AddChild(taskId); + } + } + + private TaskNode GetTaskNode(string taskId) + { + TaskNode n; + if (_nodes.TryGetValue(taskId, out n)) + { + return n; + } + throw new ArgumentException("cannot find task node in the nodes."); + } + + private void AddChild(string taskId) + { + TaskNode node = new TaskNode(_groupName, _operatorName, taskId, _driverId, false); + if (_logicalRoot != null) + { + AddTaskNode(node); + } + _nodes[taskId] = node; + } + + private void SetRootNode(string rootId) + { + _root = new TaskNode(_groupName, _operatorName, rootId, _driverId, true); + _logicalRoot = _root; + _prev = _root; + + foreach (TaskNode n in _nodes.Values) + { + AddTaskNode(n); + } + _nodes[rootId] = _root; + } + + private void AddTaskNode(TaskNode node) + { + if (_logicalRoot.GetNumberOfChildren() >= _fanOut) + { + _logicalRoot = _logicalRoot.Successor; + } + node.Parent = _logicalRoot; + _logicalRoot.AddChild(node); + _prev.Successor = node; + _prev = node; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj index e7aebdb..5537863 100644 --- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj +++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj @@ -93,6 +93,8 @@ under the License. <Compile Include="Group\Topology\FlatTopology.cs" /> <Compile Include="Group\Topology\ITopology.cs" /> <Compile Include="Group\Topology\TaskNode.cs" /> + <Compile Include="Group\Topology\TopologyTypes.cs" /> + <Compile Include="Group\Topology\TreeTopology.cs" /> <Compile Include="Naming\Codec\NamingLookupRequestCodec.cs" /> <Compile Include="Naming\Codec\NamingLookupResponseCodec.cs" /> <Compile Include="Naming\Codec\NamingRegisterRequestCodec.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs index 1719671..947dfdc 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs @@ -57,6 +57,7 @@ namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest public BroadcastReduceDriver( [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators, [Parameter(typeof(MpiTestConfig.NumIterations))] int numIterations, + [Parameter(typeof(MpiTestConfig.FanOut))] int fanOut, AvroConfigurationSerializer confSerializer) { Identifier = "BroadcastStartHandler"; @@ -66,6 +67,7 @@ namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest _mpiDriver = new MpiDriver( MpiTestConstants.DriverId, MpiTestConstants.MasterTaskId, + fanOut, confSerializer); _commGroup = _mpiDriver.NewCommunicationGroup( http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs index c74c923..17b28f4 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs @@ -64,6 +64,9 @@ namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest .BindNamedParameter<MpiTestConfig.NumIterations, int>( GenericType<MpiTestConfig.NumIterations>.Class, MpiTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<MpiTestConfig.FanOut, int>( + GenericType<MpiTestConfig.FanOut>.Class, + MpiTestConstants.FanOut.ToString(CultureInfo.InvariantCulture)) .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( GenericType<MpiTestConfig.NumEvaluators>.Class, numTasks.ToString(CultureInfo.InvariantCulture)) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs index 9450e20..6d93cab 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs @@ -32,5 +32,10 @@ namespace Org.Apache.REEF.Tests.Functional.MPI public class NumEvaluators : Name<int> { } + + [NamedParameter("tree width")] + public class FanOut : Name<int> + { + } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs index 5cff899..403990f 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs @@ -29,5 +29,6 @@ namespace Org.Apache.REEF.Tests.Functional.MPI public const string MasterTaskId = "MasterTask"; public const string SlaveTaskId = "SlaveTask-"; public const int NumIterations = 10; + public const int FanOut = 2; } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs index 8697911..e563534 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs @@ -50,8 +50,7 @@ namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest public byte[] Call(byte[] memento) { List<int> data = Enumerable.Range(1, 100).ToList(); - List<string> order = GetScatterOrder(); - _scatterSender.Send(data, order); + _scatterSender.Send(data); int sum = _sumReducer.Reduce(); _logger.Log(Level.Info, "Received sum: {0}", sum); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs index 81cf745..e16a917 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs @@ -30,6 +30,7 @@ using Org.Apache.REEF.Network.Group.Driver; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Operators.Impl; +using Org.Apache.REEF.Network.Group.Topology; using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Formats; @@ -53,6 +54,7 @@ namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest [Inject] public ScatterReduceDriver( [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators, + [Parameter(typeof(MpiTestConfig.FanOut))] int fanOut, AvroConfigurationSerializer confSerializer) { Identifier = "BroadcastStartHandler"; @@ -61,6 +63,7 @@ namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest _mpiDriver = new MpiDriver( MpiTestConstants.DriverId, MpiTestConstants.MasterTaskId, + fanOut, confSerializer); _commGroup = _mpiDriver.NewCommunicationGroup( @@ -70,7 +73,8 @@ namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest MpiTestConstants.ScatterOperatorName, new ScatterOperatorSpec<int>( MpiTestConstants.MasterTaskId, - new IntCodec())) + new IntCodec()), + TopologyTypes.Tree) .AddReduce( MpiTestConstants.ReduceOperatorName, new ReduceOperatorSpec<int>( http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs index 1a0eef5..d03036c 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs @@ -64,6 +64,9 @@ namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( GenericType<MpiTestConfig.NumEvaluators>.Class, numTasks.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<MpiTestConfig.FanOut, int>( + GenericType<MpiTestConfig.FanOut>.Class, + MpiTestConstants.FanOut.ToString(CultureInfo.InvariantCulture)) .Build(); HashSet<string> appDlls = new HashSet<string>(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs index a755892..a5297f6 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs @@ -98,9 +98,9 @@ namespace Org.Apache.REEF.Tests.Network string masterTaskId = "task0"; string driverId = "Driver Id"; int numTasks = 3; - int value = 1; + int fanOut = 2; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new AvroConfigurationSerializer()); + IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); ICommunicationGroupDriver commGroup = mpiDriver.NewCommunicationGroup( groupName, @@ -156,14 +156,14 @@ namespace Org.Apache.REEF.Tests.Network IBroadcastReceiver<int> broadcastReceiver2 = commGroups[2].GetBroadcastReceiver<int>(broadcastOperatorName); IReduceSender<int> triangleNumberSender2 = commGroups[2].GetReduceSender<int>(reduceOperatorName); - for (int i = 1; i <= 10; i++) + for (int j = 1; j <= 10; j++) { - broadcastSender.Send(i); + broadcastSender.Send(j); int n1 = broadcastReceiver1.Receive(); int n2 = broadcastReceiver2.Receive(); - Assert.AreEqual(i, n1); - Assert.AreEqual(i, n2); + Assert.AreEqual(j, n1); + Assert.AreEqual(j, n2); int triangleNum1 = TriangleNumber(n1); triangleNumberSender1.Send(triangleNum1); @@ -171,7 +171,7 @@ namespace Org.Apache.REEF.Tests.Network triangleNumberSender2.Send(triangleNum2); int sum = sumReducer.Reduce(); - int expected = TriangleNumber(i) * (numTasks - 1); + int expected = TriangleNumber(j) * (numTasks - 1); Assert.AreEqual(sum, expected); } } @@ -185,9 +185,9 @@ namespace Org.Apache.REEF.Tests.Network string masterTaskId = "task0"; string driverId = "Driver Id"; int numTasks = 5; - int value = 1; + int fanOut = 2; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new AvroConfigurationSerializer()); + IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); ICommunicationGroupDriver commGroup = mpiDriver.NewCommunicationGroup( groupName, @@ -260,10 +260,10 @@ namespace Org.Apache.REEF.Tests.Network sender.Send(data, order); - ScatterReceiveReduce(receiver1, sumSender1); - ScatterReceiveReduce(receiver2, sumSender2); - ScatterReceiveReduce(receiver3, sumSender3); ScatterReceiveReduce(receiver4, sumSender4); + ScatterReceiveReduce(receiver3, sumSender3); + ScatterReceiveReduce(receiver2, sumSender2); + ScatterReceiveReduce(receiver1, sumSender1); int sum = sumReducer.Reduce(); @@ -291,20 +291,16 @@ namespace Org.Apache.REEF.Tests.Network string operatorName = "broadcast"; string masterTaskId = "task0"; string driverId = "Driver Id"; - int numTasks = 3; + int numTasks = 10; int value = 1337; + int fanOut = 3; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new AvroConfigurationSerializer()); + IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) .AddBroadcast(operatorName, new BroadcastOperatorSpec<int>(masterTaskId, new IntCodec())) .Build(); - //for (int i = 0; i < numTasks; i++) - //{ - // nameServer.Register("task" + i, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 21)); - //} - List<IConfiguration> partialConfigs = new List<IConfiguration>(); for (int i = 0; i < numTasks; i++) { @@ -358,8 +354,9 @@ namespace Org.Apache.REEF.Tests.Network int value1 = 1337; int value2 = 42; int value3 = 99; + int fanOut = 2; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new AvroConfigurationSerializer()); + IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) .AddBroadcast(operatorName, new BroadcastOperatorSpec<int>(masterTaskId, new IntCodec())) @@ -418,22 +415,15 @@ namespace Org.Apache.REEF.Tests.Network [TestMethod] public void TestReduceOperator() { - //NameServer nameServer = new NameServer(0); - string groupName = "group1"; string operatorName = "reduce"; int numTasks = 4; - IMpiDriver mpiDriver = new MpiDriver("driverid", "task0", new AvroConfigurationSerializer()); + IMpiDriver mpiDriver = new MpiDriver("driverid", "task0", 2, new AvroConfigurationSerializer()); var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) .AddReduce(operatorName, new ReduceOperatorSpec<int>("task0", new IntCodec(), new SumFunction())) .Build(); - //for (int i = 0; i < numTasks; i++) - //{ - // nameServer.Register("task" + i, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 21)); - //} - List<IConfiguration> partialConfigs = new List<IConfiguration>(); for (int i = 0; i < numTasks; i++) { @@ -472,9 +462,9 @@ namespace Org.Apache.REEF.Tests.Network Assert.IsNotNull(sender2); Assert.IsNotNull(sender3); + sender3.Send(5); sender1.Send(1); sender2.Send(3); - sender3.Send(5); Assert.AreEqual(9, receiver.Reduce()); } @@ -485,7 +475,7 @@ namespace Org.Apache.REEF.Tests.Network string groupName = "group1"; string operatorName = "reduce"; int numTasks = 4; - IMpiDriver mpiDriver = new MpiDriver("driverid", "task0", new AvroConfigurationSerializer()); + IMpiDriver mpiDriver = new MpiDriver("driverid", "task0", 2, new AvroConfigurationSerializer()); var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) .AddReduce(operatorName, new ReduceOperatorSpec<int>("task0", new IntCodec(), new SumFunction())) @@ -529,44 +519,38 @@ namespace Org.Apache.REEF.Tests.Network Assert.IsNotNull(sender2); Assert.IsNotNull(sender3); + sender3.Send(5); sender1.Send(1); sender2.Send(3); - sender3.Send(5); Assert.AreEqual(9, receiver.Reduce()); + sender3.Send(6); sender1.Send(2); sender2.Send(4); - sender3.Send(6); Assert.AreEqual(12, receiver.Reduce()); + sender3.Send(9); sender1.Send(3); sender2.Send(6); - sender3.Send(9); Assert.AreEqual(18, receiver.Reduce()); } [TestMethod] public void TestScatterOperator() { - //NameServer nameServer = new NameServer(0); - string groupName = "group1"; string operatorName = "scatter"; string masterTaskId = "task0"; string driverId = "Driver Id"; int numTasks = 5; + int fanOut = 2; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new AvroConfigurationSerializer()); + IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) .AddScatter(operatorName, new ScatterOperatorSpec<int>(masterTaskId, new IntCodec())) .Build(); - //for (int i = 0; i < numTasks; i++) - //{ - // nameServer.Register("task" + i, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 21)); - //} - List<IConfiguration> partialConfigs = new List<IConfiguration>(); for (int i = 0; i < numTasks; i++) { @@ -620,25 +604,19 @@ namespace Org.Apache.REEF.Tests.Network [TestMethod] public void TestScatterOperator2() { - //NameServer nameServer = new NameServer(0); - string groupName = "group1"; string operatorName = "scatter"; string masterTaskId = "task0"; string driverId = "Driver Id"; int numTasks = 5; + int fanOut = 2; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new AvroConfigurationSerializer()); + IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) .AddScatter(operatorName, new ScatterOperatorSpec<int>(masterTaskId, new IntCodec())) .Build(); - //for (int i = 0; i < numTasks; i++) - //{ - // nameServer.Register("task" + i, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 21)); - //} - List<IConfiguration> partialConfigs = new List<IConfiguration>(); for (int i = 0; i < numTasks; i++) { @@ -703,25 +681,19 @@ namespace Org.Apache.REEF.Tests.Network [TestMethod] public void TestScatterOperator3() { - //NameServer nameServer = new NameServer(0); - string groupName = "group1"; string operatorName = "scatter"; string masterTaskId = "task0"; string driverId = "Driver Id"; int numTasks = 4; + int fanOut = 2; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new AvroConfigurationSerializer()); + IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) .AddScatter(operatorName, new ScatterOperatorSpec<int>(masterTaskId, new IntCodec())) .Build(); - //for (int i = 0; i < numTasks; i++) - //{ - // nameServer.Register("task" + i, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 21)); - //} - List<IConfiguration> partialConfigs = new List<IConfiguration>(); for (int i = 0; i < numTasks; i++) { @@ -783,25 +755,19 @@ namespace Org.Apache.REEF.Tests.Network [TestMethod] public void TestScatterOperator4() { - //NameServer nameServer = new NameServer(0); - string groupName = "group1"; string operatorName = "scatter"; string masterTaskId = "task0"; string driverId = "Driver Id"; int numTasks = 4; + int fanOut = 2; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new AvroConfigurationSerializer()); + IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) .AddScatter(operatorName, new ScatterOperatorSpec<int>(masterTaskId, new IntCodec())) .Build(); - //for (int i = 0; i < numTasks; i++) - //{ - // nameServer.Register("task" + i, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 21)); - //} - List<IConfiguration> partialConfigs = new List<IConfiguration>(); for (int i = 0; i < numTasks; i++) { @@ -895,7 +861,7 @@ namespace Org.Apache.REEF.Tests.Network handler, new StringIdentifierFactory(), new GroupCommunicationMessageCodec()); } - private GroupCommunicationMessage CreateGcm(string message, string from, string to) + private GroupCommunicationMessage CreateGcm(string message, string from, string to) { byte[] data = Encoding.UTF8.GetBytes(message); return new GroupCommunicationMessage("g1", "op1", from, to, data, MessageType.Data); @@ -927,4 +893,4 @@ namespace Org.Apache.REEF.Tests.Network } } } -} +} \ No newline at end of file
