[REEF-166] Adding tree topology for group communication

This PR is to add tree topology for group communication
  * add tree Topology
  * Updated Group communication driver code to use it
  * Update broadcast operator for tree typology
  * Add test cases for broadcast using tree topology

JIRA: Reef-166. (https://issues.apache.org/jira/browse/REEF-166)

Author: Julia Wang  Email: [email protected]

This closes #90


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/aa36ade1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/aa36ade1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/aa36ade1

Branch: refs/heads/master
Commit: aa36ade1eba18eafe928fdddf2a70265a01adc3c
Parents: 6ce4f41
Author: Julia Wang <[email protected]>
Authored: Thu Feb 19 10:46:12 2015 -0800
Committer: tmajest <[email protected]>
Committed: Thu Feb 26 11:50:37 2015 -0800

----------------------------------------------------------------------
 .../KMeans/KMeansDriverHandlers.cs              |   3 +-
 .../Group/Config/MpiConfigurationOptions.cs     |  15 +
 .../Group/Driver/ICommunicationGroupDriver.cs   |   8 +-
 .../Driver/Impl/CommunicationGroupDriver.cs     |  61 +-
 .../Group/Driver/Impl/MpiDriver.cs              |   5 +-
 .../Group/Operators/IReduceSender.cs            |   2 +-
 .../Group/Operators/Impl/BroadcastReceiver.cs   |   7 +-
 .../Group/Operators/Impl/ReduceSender.cs        |  30 +-
 .../Group/Operators/Impl/ScatterReceiver.cs     |   4 +-
 .../Group/Task/Impl/OperatorTopology.cs         |  36 +-
 .../Group/Topology/FlatTopology.cs              |  23 +-
 .../Group/Topology/TaskNode.cs                  |  35 +-
 .../Group/Topology/TopologyTypes.cs             |  27 +
 .../Group/Topology/TreeTopology.cs              | 233 +++++
 .../Org.Apache.REEF.Network.csproj              |   2 +
 .../BroadcastReduceDriver.cs                    |   2 +
 .../BroadcastReduceTest/BroadcastReduceTest.cs  |   3 +
 .../Functional/MPI/MpiTestConfig.cs             |   5 +
 .../Functional/MPI/MpiTestConstants.cs          |   1 +
 .../MPI/ScatterReduceTest/MasterTask.cs         |   3 +-
 .../ScatterReduceTest/ScatterReduceDriver.cs    |   6 +-
 .../MPI/ScatterReduceTest/ScatterReduceTest.cs  |   3 +
 .../Network/GroupCommunicationTests.cs          | 100 +-
 .../GroupCommunicationTreeTopologyTests.cs      | 931 +++++++++++++++++++
 .../Org.Apache.REEF.Tests.csproj                |   9 +
 25 files changed, 1430 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
index 66c16bd..4920782 100644
--- 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
+++ 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
@@ -58,6 +58,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
         // TODO: we may want to make this injectable
         private readonly int _partitionsNumber = 2;
         private readonly int _clustersNumber = 3;
+        private readonly int _fanOut = 2;
         private readonly int _totalEvaluators;
         private int _partitionInex = 0;
         private readonly IMpiDriver _mpiDriver;
@@ -78,7 +79,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
                 _executionDirectory); 
 
             _totalEvaluators = _partitionsNumber + 1;
-            _mpiDriver = new MpiDriver(Identifier, Constants.MasterTaskId, new 
AvroConfigurationSerializer());
+            _mpiDriver = new MpiDriver(Identifier, Constants.MasterTaskId, 
_fanOut, new AvroConfigurationSerializer());
 
             _commGroup = _mpiDriver.NewCommunicationGroup(
                Constants.KMeansCommunicationGroupName,

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
index db96205..bdef88b 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/MpiConfigurationOptions.cs
@@ -39,11 +39,26 @@ namespace Org.Apache.REEF.Network.Group.Config
         {
         }
 
+        [NamedParameter("Timeout for receiving data", defaultValue: "50000")]
+        public class Timeout : Name<int>
+        {
+        }
+
+        [NamedParameter("Retry times", defaultValue: "5")]
+        public class RetryCount : Name<int>
+        {
+        }
+
         [NamedParameter("Master task identifier")]
         public class MasterTaskId : Name<string>
         {
         }
 
+        [NamedParameter("with of the tree in topology", defaultValue:"2")]
+        public class FanOut : Name<int>
+        {
+        }
+
         [NamedParameter("Serialized communication group configuration")]
         public class SerializedGroupConfigs : Name<ISet<string>>
         {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
index 0d41209..22caebd 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs
@@ -19,6 +19,7 @@
 
 using System.Collections.Generic;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.Group.Topology;
 using Org.Apache.REEF.Tang.Interface;
 
 namespace Org.Apache.REEF.Network.Group.Driver
@@ -42,7 +43,7 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <param name="operatorName">The name of the broadcast 
operator</param>
         /// <param name="spec">The specification that defines the Broadcast 
operator</param>
         /// <returns>The same CommunicationGroupDriver with the added 
Broadcast operator info</returns>
-        ICommunicationGroupDriver AddBroadcast<T>(string operatorName, 
BroadcastOperatorSpec<T> spec);
+        ICommunicationGroupDriver AddBroadcast<T>(string operatorName, 
BroadcastOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat);
 
         /// <summary>
         /// Adds the Reduce MPI operator to the communication group.
@@ -51,7 +52,7 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <param name="operatorName">The name of the reduce operator</param>
         /// <param name="spec">The specification that defines the Reduce 
operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Reduce 
operator info</returns>
-        ICommunicationGroupDriver AddReduce<T>(string operatorName, 
ReduceOperatorSpec<T> spec);
+        ICommunicationGroupDriver AddReduce<T>(string operatorName, 
ReduceOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat);
 
         /// <summary>
         /// Adds the Scatter MPI operator to the communication group.
@@ -59,8 +60,9 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <typeparam name="T">The type of messages that operators will 
send</typeparam>
         /// <param name="operatorName">The name of the scatter operator</param>
         /// <param name="spec">The specification that defines the Scatter 
operator</param>
+        /// <param name="topologyType">type of topology used in the 
operaor</param>
         /// <returns>The same CommunicationGroupDriver with the added Scatter 
operator info</returns>
-        ICommunicationGroupDriver AddScatter<T>(string operatorName, 
ScatterOperatorSpec<T> spec);
+        ICommunicationGroupDriver AddScatter<T>(string operatorName, 
ScatterOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat);
 
         /// <summary>
         /// Finalizes the CommunicationGroupDriver.

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
index 5b57d9e..0b426a6 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs
@@ -45,6 +45,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         private readonly int _numTasks;
         private int _tasksAdded;
         private bool _finalized;
+        private readonly int _fanOut;
 
         private readonly AvroConfigurationSerializer _confSerializer;
 
@@ -61,8 +62,9 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <param name="confSerializer">Used to serialize task 
configuration</param>
         public CommunicationGroupDriver(
             string groupName,
-            string driverId, 
+            string driverId,
             int numTasks,
+            int fanOut,
             AvroConfigurationSerializer confSerializer)
         {
             _confSerializer = confSerializer;
@@ -71,6 +73,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
             _numTasks = numTasks;
             _tasksAdded = 0;
             _finalized = false;
+            _fanOut = fanOut;
 
             _topologyLock = new object();
 
@@ -82,7 +85,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <summary>
         /// Returns the list of task ids that belong to this Communication 
Group
         /// </summary>
-        public List<string> TaskIds { get; private set; } 
+        public List<string> TaskIds { get; private set; }
 
         /// <summary>
         /// Adds the Broadcast MPI operator to the communication group.
@@ -92,15 +95,26 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <param name="spec">The specification that defines the Broadcast 
operator</param>
         /// <returns>The same CommunicationGroupDriver with the added 
Broadcast operator info</returns>
         public ICommunicationGroupDriver AddBroadcast<T>(
-            string operatorName, 
-            BroadcastOperatorSpec<T> spec)
+            string operatorName,
+            BroadcastOperatorSpec<T> spec,
+            TopologyTypes topologyType = TopologyTypes.Flat)
         {
             if (_finalized)
             {
                 throw new IllegalStateException("Can't add operators once the 
spec has been built.");
             }
 
-            ITopology<T> topology = new FlatTopology<T>(operatorName, 
_groupName, spec.SenderId, _driverId, spec);
+            ITopology<T> topology;
+
+            if (topologyType == TopologyTypes.Flat)
+            {
+                topology = new FlatTopology<T>(operatorName, _groupName, 
spec.SenderId, _driverId, spec);
+            }
+            else
+            {
+                topology = new TreeTopology<T>(operatorName, _groupName, 
spec.SenderId, _driverId, spec,
+                    _fanOut);
+            }
             _topologies[operatorName] = topology;
             _operatorSpecs[operatorName] = spec;
 
@@ -115,15 +129,26 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <param name="spec">The specification that defines the Reduce 
operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Reduce 
operator info</returns>
         public ICommunicationGroupDriver AddReduce<T>(
-            string operatorName, 
-            ReduceOperatorSpec<T> spec)
+            string operatorName,
+            ReduceOperatorSpec<T> spec,
+            TopologyTypes topologyType = TopologyTypes.Flat)
         {
             if (_finalized)
             {
                 throw new IllegalStateException("Can't add operators once the 
spec has been built.");
             }
 
-            ITopology<T> topology = new FlatTopology<T>(operatorName, 
_groupName, spec.ReceiverId, _driverId, spec);
+            ITopology<T> topology;
+
+            if (topologyType == TopologyTypes.Flat)
+            {
+                topology = new FlatTopology<T>(operatorName, _groupName, 
spec.ReceiverId, _driverId, spec);
+            }
+            else
+            {
+                topology = new TreeTopology<T>(operatorName, _groupName, 
spec.ReceiverId, _driverId, spec,
+                    _fanOut);
+            }
             _topologies[operatorName] = topology;
             _operatorSpecs[operatorName] = spec;
 
@@ -137,14 +162,24 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <param name="operatorName">The name of the scatter operator</param>
         /// <param name="spec">The specification that defines the Scatter 
operator</param>
         /// <returns>The same CommunicationGroupDriver with the added Scatter 
operator info</returns>
-        public ICommunicationGroupDriver AddScatter<T>(string operatorName, 
ScatterOperatorSpec<T> spec)
+        public ICommunicationGroupDriver AddScatter<T>(string operatorName, 
ScatterOperatorSpec<T> spec, TopologyTypes topologyType = TopologyTypes.Flat)
         {
             if (_finalized)
             {
                 throw new IllegalStateException("Can't add operators once the 
spec has been built.");
             }
 
-            ITopology<T> topology = new FlatTopology<T>(operatorName, 
_groupName, spec.SenderId, _driverId, spec);
+            ITopology<T> topology; 
+
+            if (topologyType == TopologyTypes.Flat)
+            {
+                topology = new FlatTopology<T>(operatorName, _groupName, 
spec.SenderId, _driverId, spec);
+            }
+            else
+            {
+                topology = new TreeTopology<T>(operatorName, _groupName, 
spec.SenderId, _driverId, spec,
+                    _fanOut);
+            }
             _topologies[operatorName] = topology;
             _operatorSpecs[operatorName] = spec;
 
@@ -245,14 +280,14 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         {
             var topology = _topologies[operatorName];
             MethodInfo info = topology.GetType().GetMethod("AddTask");
-            info.Invoke(topology, new[] { (object) taskId });
+            info.Invoke(topology, new[] { (object)taskId });
         }
 
         private IConfiguration GetOperatorConfiguration(string operatorName, 
string taskId)
         {
             var topology = _topologies[operatorName];
             MethodInfo info = 
topology.GetType().GetMethod("GetTaskConfiguration");
-            return (IConfiguration) info.Invoke(topology, new[] { (object) 
taskId });
+            return (IConfiguration)info.Invoke(topology, new[] { 
(object)taskId });
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs
index 52eec91..40ca02c 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/MpiDriver.cs
@@ -55,6 +55,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         private readonly string _nameServerAddr;
         private readonly int _nameServerPort;
         private int _contextIds;
+        private int _fanOut;
 
         private readonly Dictionary<string, ICommunicationGroupDriver> 
_commGroups; 
         private readonly AvroConfigurationSerializer _configSerializer;
@@ -70,10 +71,12 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         public MpiDriver(
             [Parameter(typeof(MpiConfigurationOptions.DriverId))] string 
driverId,
             [Parameter(typeof(MpiConfigurationOptions.MasterTaskId))] string 
masterTaskId,
+            [Parameter(typeof(MpiConfigurationOptions.FanOut))] int fanOut,
             AvroConfigurationSerializer configSerializer)
         {
             _driverId = driverId;
             _contextIds = -1;
+            _fanOut = fanOut;
             MasterTaskId = masterTaskId;
 
             _configSerializer = configSerializer;
@@ -111,7 +114,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
                 throw new ArgumentException("Group Name already registered 
with MpiDriver");
             }
 
-            var commGroup = new CommunicationGroupDriver(groupName, _driverId, 
numTasks, _configSerializer);
+            var commGroup = new CommunicationGroupDriver(groupName, _driverId, 
numTasks, _fanOut, _configSerializer);
             _commGroups[groupName] = commGroup;
             return commGroup;
         }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
index e7ebf3c..97990bd 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/IReduceSender.cs
@@ -26,7 +26,7 @@ namespace Org.Apache.REEF.Network.Group.Operators
     public interface IReduceSender<T> : IMpiOperator<T>
     {
         /// <summary>
-        /// Sends data to the operator's ReduceReceiver to be aggregated.
+        /// Get reduced data from children, reduce with the data given, then 
sends reduced data to parent
         /// </summary>
         /// <param name="data">The data to send</param>
         void Send(T data);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
index 92821c8..d45fa24 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs
@@ -85,7 +85,12 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <returns>The incoming message</returns>
         public T Receive()
         {
-            return _topology.ReceiveFromParent();
+            var data = _topology.ReceiveFromParent();
+            if (_topology.HasChildren())
+            {
+                _topology.SendToChildren(data, MessageType.Data);
+            }
+            return data;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
index 1c27759..a2c7d93 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs
@@ -18,6 +18,7 @@
  */
 
 using System;
+using System.Collections.Generic;
 using System.Reactive;
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
@@ -37,6 +38,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
 
         private readonly ICommunicationGroupNetworkObserver _networkHandler;
         private readonly OperatorTopology<T> _topology;
+        private readonly IReduceFunction<T> _reduceFunction;
 
         /// <summary>
         /// Creates a new ReduceSender.
@@ -50,12 +52,13 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
             [Parameter(typeof(MpiConfigurationOptions.OperatorName))] string 
operatorName,
             
[Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string 
groupName,
             OperatorTopology<T> topology, 
-            ICommunicationGroupNetworkObserver networkHandler)
+            ICommunicationGroupNetworkObserver networkHandler,
+            IReduceFunction<T> reduceFunction)
         {
             OperatorName = operatorName;
             GroupName = groupName;
             Version = DefaultVersion;
-
+            _reduceFunction = reduceFunction;
             _networkHandler = networkHandler;
             _topology = topology;
             _topology.Initialize();
@@ -80,7 +83,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         public int Version { get; private set; }
 
         /// <summary>
-        /// Sends the data to the operator's ReduceReceiver to be aggregated.
+        /// Sends data to the operator's ReduceReceiver to be aggregated.
         /// </summary>
         /// <param name="data">The data to send</param>
         public void Send(T data)
@@ -90,7 +93,26 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
                 throw new ArgumentNullException("data");    
             }
 
-            _topology.SendToParent(data, MessageType.Data);
+            //middle notes
+            if (_topology.HasChildren())
+            {
+                var reducedValueOfChildren = 
_topology.ReceiveFromChildren(_reduceFunction);
+
+                var mergeddData = new List<T>();
+                mergeddData.Add(data);
+                if (reducedValueOfChildren != null)
+                {
+                    mergeddData.Add(reducedValueOfChildren);
+                }
+                T reducedValue = _reduceFunction.Reduce(mergeddData);
+
+                _topology.SendToParent(reducedValue, MessageType.Data);
+            }
+            else
+            {
+                //leaf node
+                _topology.SendToParent(data, MessageType.Data);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
index 8037f6f..6a9207d 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs
@@ -91,7 +91,9 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl
         /// <returns>The sublist of messages</returns>
         public List<T> Receive()
         {
-            return _topology.ReceiveListFromParent();
+            List<T> elements = _topology.ReceiveListFromParent();
+            _topology.ScatterToChildren(elements, MessageType.Data);
+            return elements;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
index d8e732f..fcec282 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs
@@ -45,15 +45,14 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
     /// <typeparam name="T">The message type</typeparam>
     public class OperatorTopology<T> : IObserver<GroupCommunicationMessage>
     {
-        private const int DefaultTimeout = 10000;
-        private const int RetryCount = 5;
-
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(OperatorTopology<>));
 
         private readonly string _groupName;
         private readonly string _operatorName;
         private readonly string _selfId;
         private string _driverId;
+        private readonly int _timeout;
+        private readonly int _retryCount;
 
         private readonly NodeStruct _parent;
         private readonly List<NodeStruct> _children;
@@ -81,6 +80,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             
[Parameter(typeof(MpiConfigurationOptions.CommunicationGroupName))] string 
groupName,
             [Parameter(typeof(TaskConfigurationOptions.Identifier))] string 
taskId,
             [Parameter(typeof(MpiConfigurationOptions.DriverId))] string 
driverId,
+            [Parameter(typeof(MpiConfigurationOptions.Timeout))] int timrout,
+            [Parameter(typeof(MpiConfigurationOptions.RetryCount))] int 
retryCount,
             [Parameter(typeof(MpiConfigurationOptions.TopologyRootTaskId))] 
string rootId,
             [Parameter(typeof(MpiConfigurationOptions.TopologyChildTaskIds))] 
ISet<string> childIds,
             NetworkService<GroupCommunicationMessage> networkService,
@@ -91,6 +92,8 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             _groupName = groupName;
             _selfId = taskId;
             _driverId = driverId;
+            _timeout = timrout;
+            _retryCount = retryCount;
             _codec = codec;
             _nameClient = networkService.NamingClient;
             _sender = sender;
@@ -101,18 +104,18 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             if (_selfId.Equals(rootId))
             {
                 _parent = null;
-                foreach (string childId in childIds)
-                {
-                    NodeStruct node = new NodeStruct(childId);
-                    _children.Add(node);
-                    _idToNodeMap[childId] = node;
-                }
             }
             else
             {
                 _parent = new NodeStruct(rootId);
                 _idToNodeMap[rootId] = _parent;
             }
+            foreach (string childId in childIds)
+            {
+                NodeStruct node = new NodeStruct(childId);
+                _children.Add(node);
+                _idToNodeMap[childId] = node;
+            }
         }
 
         /// <summary>
@@ -126,14 +129,14 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             {
                 if (_parent != null)
                 {
-                    WaitForTaskRegistration(_parent.Identifier, RetryCount);
+                    WaitForTaskRegistration(_parent.Identifier, _retryCount);
                 }
 
                 if (_children.Count > 0)
                 {
                     foreach (NodeStruct child in _children)
                     {
-                        WaitForTaskRegistration(child.Identifier, RetryCount);
+                        WaitForTaskRegistration(child.Identifier, _retryCount);
                     }
                 }
             }
@@ -212,7 +215,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             }
             if (_children.Count <= 0)
             {
-                throw new ArgumentException("Cannot scatter, no children 
available");
+                return;
             }
 
             int count = (int) Math.Ceiling(((double) messages.Count) / 
_children.Count);
@@ -344,13 +347,18 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
         {
         }
 
+        public bool HasChildren()
+        {
+            return _children.Count > 0;
+        }
+
         /// <summary>
         /// Get a node containing an incoming message.
         /// </summary>
         /// <returns>A NodeStruct with incoming data.</returns>
         private NodeStruct GetNodeWithData()
         {
-            CancellationTokenSource timeoutSource = new 
CancellationTokenSource(DefaultTimeout);
+            CancellationTokenSource timeoutSource = new 
CancellationTokenSource(_timeout);
 
             try
             {
@@ -480,4 +488,4 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl
             throw new IllegalStateException("Failed to initialize operator 
topology for node: " + identifier);
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
index 39b5a99..892c82c 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs
@@ -88,16 +88,19 @@ namespace Org.Apache.REEF.Network.Group.Topology
                     
GenericType<MpiConfigurationOptions.TopologyRootTaskId>.Class,
                     _rootId);
 
-            foreach (string tId in _nodes.Keys)
+            if (taskId.Equals(_rootId))
             {
-                if (!tId.Equals(_rootId))
+                foreach (string tId in _nodes.Keys)
                 {
-                    
confBuilder.BindSetEntry<MpiConfigurationOptions.TopologyChildTaskIds, string>(
-                        
GenericType<MpiConfigurationOptions.TopologyChildTaskIds>.Class,
-                        tId);
+                    if (!tId.Equals(_rootId))
+                    {
+                        
confBuilder.BindSetEntry<MpiConfigurationOptions.TopologyChildTaskIds, string>(
+                            
GenericType<MpiConfigurationOptions.TopologyChildTaskIds>.Class,
+                            tId);
+                    }
                 }
             }
-            
+
             if (OperatorSpec is BroadcastOperatorSpec<T>)
             {
                 BroadcastOperatorSpec<T> broadcastSpec = OperatorSpec as 
BroadcastOperatorSpec<T>;
@@ -171,25 +174,25 @@ namespace Org.Apache.REEF.Network.Group.Topology
 
         private void SetRootNode(string rootId)
         {
-            TaskNode rootNode = new TaskNode(_groupName, _operatorName, 
rootId, _driverId);
+            TaskNode rootNode = new TaskNode(_groupName, _operatorName, 
rootId, _driverId, true);
             _root = rootNode;
 
             foreach (TaskNode childNode in _nodes.Values)
             {
                 rootNode.AddChild(childNode);
-                childNode.SetParent(rootNode);
+                childNode.Parent = rootNode;
             }
         }
 
         private void AddChild(string childId)
         {
-            TaskNode childNode = new TaskNode(_groupName, _operatorName, 
childId, _driverId);
+            TaskNode childNode = new TaskNode(_groupName, _operatorName, 
childId, _driverId, false);
             _nodes[childId] = childNode;
 
             if (_root != null)
             {
                 _root.AddChild(childNode);
-                childNode.SetParent(_root);
+                childNode.Parent = _root;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs
index 65cdb5e..fe010a8 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs
@@ -32,32 +32,57 @@ namespace Org.Apache.REEF.Network.Group.Topology
         private string _driverId;
 
         private TaskNode _parent;
+        private TaskNode _successor;
+        private bool _isRoot;
+
         private readonly List<TaskNode> _children;
 
         public TaskNode(
             string groupName, 
             string operatorName, 
             string taskId, 
-            string driverId)
+            string driverId,
+            bool isRoot)
         {
             _groupName = groupName;
             _operatorName = operatorName;
             _taskId = taskId;
             _driverId = driverId;
+            _isRoot = isRoot;
 
             _children = new List<TaskNode>();
         }
 
-        public string TaskId { get; private set; }
+        public string TaskId
+        {
+            get { return _taskId; } 
+            private set { _taskId = value;  }
+        }
+
+        public TaskNode Successor
+        {
+            get { return _successor; }
+            set { _successor = value; }
+        }
+
+        public TaskNode Parent
+        {
+            get { return _parent; }
+            set { _parent = value; }
+        }
 
         public void AddChild(TaskNode child)
         {
             _children.Add(child);
         }
 
-        public void SetParent(TaskNode parent)
-        {
-            _parent = parent;
+        public int GetNumberOfChildren() {
+            return _children.Count;
         }
+
+        public IList<TaskNode> GetChildren()
+        {
+            return _children;
+        } 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Topology/TopologyTypes.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TopologyTypes.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TopologyTypes.cs
new file mode 100644
index 0000000..fdf2c52
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TopologyTypes.cs
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Network.Group.Topology
+{
+    public enum TopologyTypes
+    {
+        Flat = 0,
+        Tree = 1
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
new file mode 100644
index 0000000..50b2636
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.Network.Group.Topology
+{
+    public class TreeTopology<T> : ITopology<T>
+    {
+        private readonly string _groupName;
+        private readonly string _operatorName;
+
+        private readonly string _rootId;
+        private readonly string _driverId;
+
+        private readonly Dictionary<string, TaskNode> _nodes;
+        private TaskNode _root;
+        private TaskNode _logicalRoot;
+        private TaskNode _prev;
+
+        private int _fanOut;
+
+        /// <summary>
+        /// Creates a new TreeTopology.
+        /// </summary>
+        /// <param name="operatorName">The operator name</param>
+        /// <param name="groupName">The name of the topology's 
CommunicationGroup</param>
+        /// <param name="rootId">The root Task identifier</param>
+        /// <param name="driverId">The driver identifier</param>
+        /// <param name="operatorSpec">The operator specification</param>
+        /// <param name="fanOut">The number of chldren for a tree node</param>
+        public TreeTopology(
+            string operatorName, 
+            string groupName, 
+            string rootId,
+            string driverId,
+            IOperatorSpec<T> operatorSpec,
+            int fanOut)
+        {
+            _groupName = groupName;
+            _operatorName = operatorName;
+            _rootId = rootId;
+            _driverId = driverId;
+
+            OperatorSpec = operatorSpec;
+            _fanOut = fanOut;
+
+            _nodes = new Dictionary<string, TaskNode>(); 
+        }
+
+        public IOperatorSpec<T> OperatorSpec { get; set; }
+
+        /// <summary>
+        /// Gets the task configuration for the operator topology.
+        /// </summary>
+        /// <param name="taskId">The task identifier</param>
+        /// <returns>The task configuration</returns>
+        public IConfiguration GetTaskConfiguration(string taskId)
+        {
+            if (taskId == null)
+            {
+                throw new ArgumentException("TaskId is null when 
GetTaskConfiguration");
+            }
+
+            TaskNode selfTaskNode = GetTaskNode(taskId);
+            if (selfTaskNode == null)
+            {
+                throw new ArgumentException("Task has not been added to the 
topology");
+            }
+
+            string parentId;
+            TaskNode parent = selfTaskNode.Parent;
+            if (parent == null)
+            {
+                parentId = selfTaskNode.TaskId;
+            }
+            else
+            {
+                parentId = parent.TaskId;
+            }
+
+            //add parentid, if no parent, add itself
+            var confBuilder = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindImplementation(typeof(ICodec<T>), 
OperatorSpec.Codec.GetType())
+                
.BindNamedParameter<MpiConfigurationOptions.TopologyRootTaskId, string>(
+                    
GenericType<MpiConfigurationOptions.TopologyRootTaskId>.Class,
+                    parentId);
+
+            //add all its children
+            foreach (TaskNode childNode in selfTaskNode.GetChildren())
+            {
+                
confBuilder.BindSetEntry<MpiConfigurationOptions.TopologyChildTaskIds, string>(
+                    
GenericType<MpiConfigurationOptions.TopologyChildTaskIds>.Class,
+                    childNode.TaskId);
+            }
+
+            if (OperatorSpec is BroadcastOperatorSpec<T>)
+            {
+                BroadcastOperatorSpec<T> broadcastSpec = OperatorSpec as 
BroadcastOperatorSpec<T>;
+                if (taskId.Equals(broadcastSpec.SenderId))
+                {
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<BroadcastSender<T>>.Class);
+                }
+                else
+                {
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<BroadcastReceiver<T>>.Class);
+                }
+            }
+            else if (OperatorSpec is ReduceOperatorSpec<T>)
+            {
+                ReduceOperatorSpec<T> reduceSpec = OperatorSpec as 
ReduceOperatorSpec<T>;
+                confBuilder.BindImplementation(typeof(IReduceFunction<T>), 
reduceSpec.ReduceFunction.GetType());
+
+                if (taskId.Equals(reduceSpec.ReceiverId))
+                {
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<ReduceReceiver<T>>.Class);
+                }
+                else
+                {
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<ReduceSender<T>>.Class);
+                }
+            }
+            else if (OperatorSpec is ScatterOperatorSpec<T>)
+            {
+                ScatterOperatorSpec<T> scatterSpec = OperatorSpec as 
ScatterOperatorSpec<T>;
+                if (taskId.Equals(scatterSpec.SenderId))
+                {
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<ScatterSender<T>>.Class);
+                }
+                else
+                {
+                    
confBuilder.BindImplementation(GenericType<IMpiOperator<T>>.Class, 
GenericType<ScatterReceiver<T>>.Class);
+                }
+            }
+            else
+            {
+                throw new NotSupportedException("Spec type not supported");
+            }
+
+            return confBuilder.Build();
+        }
+
+        public void AddTask(string taskId)
+        {
+            if (string.IsNullOrEmpty(taskId))
+            {
+                throw new ArgumentNullException("taskId");
+            }
+            if (_nodes.ContainsKey(taskId))
+            {
+                throw new ArgumentException("Task has already been added to 
the topology");
+            }
+
+            if (taskId.Equals(_rootId))
+            {
+                SetRootNode(_rootId);
+            }
+            else
+            {
+                AddChild(taskId);
+            }
+        }
+
+        private TaskNode GetTaskNode(string taskId)
+        {
+            TaskNode n;
+            if (_nodes.TryGetValue(taskId, out n))
+            {
+                return n;
+            }
+            throw new ArgumentException("cannot find task node in the nodes.");
+        }
+
+        private void AddChild(string taskId)
+        {
+            TaskNode node = new TaskNode(_groupName, _operatorName, taskId, 
_driverId, false);
+            if (_logicalRoot != null)
+            {
+                AddTaskNode(node);
+            }
+            _nodes[taskId] = node;
+        }
+
+        private void SetRootNode(string rootId) 
+        {
+            _root = new TaskNode(_groupName, _operatorName, rootId, _driverId, 
true);
+            _logicalRoot = _root;
+            _prev = _root;
+
+            foreach (TaskNode n in _nodes.Values) 
+            {
+                AddTaskNode(n);
+            }
+            _nodes[rootId] = _root;
+        }
+
+        private void AddTaskNode(TaskNode node) 
+        {
+            if (_logicalRoot.GetNumberOfChildren() >= _fanOut) 
+            {
+                _logicalRoot = _logicalRoot.Successor;
+            }
+            node.Parent = _logicalRoot;
+            _logicalRoot.AddChild(node);
+            _prev.Successor = node;
+            _prev = node;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj 
b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
index e7aebdb..5537863 100644
--- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
+++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj
@@ -93,6 +93,8 @@ under the License.
     <Compile Include="Group\Topology\FlatTopology.cs" />
     <Compile Include="Group\Topology\ITopology.cs" />
     <Compile Include="Group\Topology\TaskNode.cs" />
+    <Compile Include="Group\Topology\TopologyTypes.cs" />
+    <Compile Include="Group\Topology\TreeTopology.cs" />
     <Compile Include="Naming\Codec\NamingLookupRequestCodec.cs" />
     <Compile Include="Naming\Codec\NamingLookupResponseCodec.cs" />
     <Compile Include="Naming\Codec\NamingRegisterRequestCodec.cs" />

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
index 1719671..947dfdc 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs
@@ -57,6 +57,7 @@ namespace 
Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
         public BroadcastReduceDriver(
             [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators,
             [Parameter(typeof(MpiTestConfig.NumIterations))] int numIterations,
+            [Parameter(typeof(MpiTestConfig.FanOut))] int fanOut,
             AvroConfigurationSerializer confSerializer)
         {
             Identifier = "BroadcastStartHandler";
@@ -66,6 +67,7 @@ namespace 
Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
             _mpiDriver = new MpiDriver(
                 MpiTestConstants.DriverId,
                 MpiTestConstants.MasterTaskId,
+                fanOut,
                 confSerializer);
 
             _commGroup = _mpiDriver.NewCommunicationGroup(

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
index c74c923..17b28f4 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs
@@ -64,6 +64,9 @@ namespace 
Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest
                 .BindNamedParameter<MpiTestConfig.NumIterations, int>(
                     GenericType<MpiTestConfig.NumIterations>.Class,
                     
MpiTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<MpiTestConfig.FanOut, int>(
+                    GenericType<MpiTestConfig.FanOut>.Class,
+                    
MpiTestConstants.FanOut.ToString(CultureInfo.InvariantCulture))
                 .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
                     GenericType<MpiTestConfig.NumEvaluators>.Class,
                     numTasks.ToString(CultureInfo.InvariantCulture))

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
index 9450e20..6d93cab 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs
@@ -32,5 +32,10 @@ namespace Org.Apache.REEF.Tests.Functional.MPI
         public class NumEvaluators : Name<int>
         {
         }
+
+        [NamedParameter("tree width")]
+        public class FanOut : Name<int>
+        {
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
index 5cff899..403990f 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs
@@ -29,5 +29,6 @@ namespace Org.Apache.REEF.Tests.Functional.MPI
         public const string MasterTaskId = "MasterTask";
         public const string SlaveTaskId = "SlaveTask-";
         public const int NumIterations = 10;
+        public const int FanOut = 2;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs
index 8697911..e563534 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs
@@ -50,8 +50,7 @@ namespace 
Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest
         public byte[] Call(byte[] memento)
         {
             List<int> data = Enumerable.Range(1, 100).ToList();
-            List<string> order = GetScatterOrder();
-            _scatterSender.Send(data, order);
+            _scatterSender.Send(data);
 
             int sum = _sumReducer.Reduce();
             _logger.Log(Level.Info, "Received sum: {0}", sum);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
index 81cf745..e16a917 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs
@@ -30,6 +30,7 @@ using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
+using Org.Apache.REEF.Network.Group.Topology;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Formats;
@@ -53,6 +54,7 @@ namespace 
Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest
         [Inject]
         public ScatterReduceDriver(
             [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators,
+            [Parameter(typeof(MpiTestConfig.FanOut))] int fanOut,
             AvroConfigurationSerializer confSerializer)
         {
             Identifier = "BroadcastStartHandler";
@@ -61,6 +63,7 @@ namespace 
Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest
             _mpiDriver = new MpiDriver(
                 MpiTestConstants.DriverId,
                 MpiTestConstants.MasterTaskId,
+                fanOut,
                 confSerializer);
 
             _commGroup = _mpiDriver.NewCommunicationGroup(
@@ -70,7 +73,8 @@ namespace 
Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest
                         MpiTestConstants.ScatterOperatorName,
                         new ScatterOperatorSpec<int>(
                             MpiTestConstants.MasterTaskId,
-                            new IntCodec()))
+                            new IntCodec()),
+                            TopologyTypes.Tree)
                     .AddReduce(
                         MpiTestConstants.ReduceOperatorName,
                         new ReduceOperatorSpec<int>(

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs
index 1a0eef5..d03036c 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs
@@ -64,6 +64,9 @@ namespace 
Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest
                 .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
                     GenericType<MpiTestConfig.NumEvaluators>.Class,
                     numTasks.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<MpiTestConfig.FanOut, int>(
+                    GenericType<MpiTestConfig.FanOut>.Class,
+                    
MpiTestConstants.FanOut.ToString(CultureInfo.InvariantCulture))
                 .Build();
                     
             HashSet<string> appDlls = new HashSet<string>();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/aa36ade1/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs 
b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
index a755892..a5297f6 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTests.cs
@@ -98,9 +98,9 @@ namespace Org.Apache.REEF.Tests.Network
             string masterTaskId = "task0";
             string driverId = "Driver Id";
             int numTasks = 3;
-            int value = 1;
+            int fanOut = 2;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new 
AvroConfigurationSerializer());
+            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
 
             ICommunicationGroupDriver commGroup = 
mpiDriver.NewCommunicationGroup(
                 groupName,
@@ -156,14 +156,14 @@ namespace Org.Apache.REEF.Tests.Network
             IBroadcastReceiver<int> broadcastReceiver2 = 
commGroups[2].GetBroadcastReceiver<int>(broadcastOperatorName);
             IReduceSender<int> triangleNumberSender2 = 
commGroups[2].GetReduceSender<int>(reduceOperatorName);
 
-            for (int i = 1; i <= 10; i++)
+            for (int j = 1; j <= 10; j++)
             {
-                broadcastSender.Send(i);
+                broadcastSender.Send(j);
 
                 int n1 = broadcastReceiver1.Receive();
                 int n2 = broadcastReceiver2.Receive();
-                Assert.AreEqual(i, n1);
-                Assert.AreEqual(i, n2);
+                Assert.AreEqual(j, n1);
+                Assert.AreEqual(j, n2);
 
                 int triangleNum1 = TriangleNumber(n1);
                 triangleNumberSender1.Send(triangleNum1);
@@ -171,7 +171,7 @@ namespace Org.Apache.REEF.Tests.Network
                 triangleNumberSender2.Send(triangleNum2);
 
                 int sum = sumReducer.Reduce();
-                int expected = TriangleNumber(i) * (numTasks - 1);
+                int expected = TriangleNumber(j) * (numTasks - 1);
                 Assert.AreEqual(sum, expected);
             }
         }
@@ -185,9 +185,9 @@ namespace Org.Apache.REEF.Tests.Network
             string masterTaskId = "task0";
             string driverId = "Driver Id";
             int numTasks = 5;
-            int value = 1;
+            int fanOut = 2;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new 
AvroConfigurationSerializer());
+            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
 
             ICommunicationGroupDriver commGroup = 
mpiDriver.NewCommunicationGroup(
                 groupName,
@@ -260,10 +260,10 @@ namespace Org.Apache.REEF.Tests.Network
 
             sender.Send(data, order);
 
-            ScatterReceiveReduce(receiver1, sumSender1);
-            ScatterReceiveReduce(receiver2, sumSender2);
-            ScatterReceiveReduce(receiver3, sumSender3);
             ScatterReceiveReduce(receiver4, sumSender4);
+            ScatterReceiveReduce(receiver3, sumSender3);
+            ScatterReceiveReduce(receiver2, sumSender2);
+            ScatterReceiveReduce(receiver1, sumSender1);
 
             int sum = sumReducer.Reduce();
 
@@ -291,20 +291,16 @@ namespace Org.Apache.REEF.Tests.Network
             string operatorName = "broadcast";
             string masterTaskId = "task0";
             string driverId = "Driver Id";
-            int numTasks = 3;
+            int numTasks = 10;
             int value = 1337;
+            int fanOut = 3;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new 
AvroConfigurationSerializer());
+            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
 
             var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
                 .AddBroadcast(operatorName, new 
BroadcastOperatorSpec<int>(masterTaskId, new IntCodec()))
                 .Build();
 
-            //for (int i = 0; i < numTasks; i++)
-            //{
-            //    nameServer.Register("task" + i, new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), 21));
-            //}
-
             List<IConfiguration> partialConfigs = new List<IConfiguration>();
             for (int i = 0; i < numTasks; i++)
             {
@@ -358,8 +354,9 @@ namespace Org.Apache.REEF.Tests.Network
             int value1 = 1337;
             int value2 = 42;
             int value3 = 99;
+            int fanOut = 2;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new 
AvroConfigurationSerializer());
+            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
 
             var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
                 .AddBroadcast(operatorName, new 
BroadcastOperatorSpec<int>(masterTaskId, new IntCodec()))
@@ -418,22 +415,15 @@ namespace Org.Apache.REEF.Tests.Network
         [TestMethod]
         public void TestReduceOperator()
         {
-            //NameServer nameServer = new NameServer(0);
-
             string groupName = "group1";
             string operatorName = "reduce";
             int numTasks = 4;
-            IMpiDriver mpiDriver = new MpiDriver("driverid", "task0", new 
AvroConfigurationSerializer());
+            IMpiDriver mpiDriver = new MpiDriver("driverid", "task0", 2, new 
AvroConfigurationSerializer());
 
             var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
                 .AddReduce(operatorName, new ReduceOperatorSpec<int>("task0", 
new IntCodec(), new SumFunction()))
                 .Build();
 
-            //for (int i = 0; i < numTasks; i++)
-            //{
-            //    nameServer.Register("task" + i, new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), 21));
-            //}
-
             List<IConfiguration> partialConfigs = new List<IConfiguration>();
             for (int i = 0; i < numTasks; i++)
             {
@@ -472,9 +462,9 @@ namespace Org.Apache.REEF.Tests.Network
             Assert.IsNotNull(sender2);
             Assert.IsNotNull(sender3);
 
+            sender3.Send(5);
             sender1.Send(1);
             sender2.Send(3);
-            sender3.Send(5);
 
             Assert.AreEqual(9, receiver.Reduce());
         }
@@ -485,7 +475,7 @@ namespace Org.Apache.REEF.Tests.Network
             string groupName = "group1";
             string operatorName = "reduce";
             int numTasks = 4;
-            IMpiDriver mpiDriver = new MpiDriver("driverid", "task0", new 
AvroConfigurationSerializer());
+            IMpiDriver mpiDriver = new MpiDriver("driverid", "task0", 2, new 
AvroConfigurationSerializer());
 
             var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
                 .AddReduce(operatorName, new ReduceOperatorSpec<int>("task0", 
new IntCodec(), new SumFunction()))
@@ -529,44 +519,38 @@ namespace Org.Apache.REEF.Tests.Network
             Assert.IsNotNull(sender2);
             Assert.IsNotNull(sender3);
 
+            sender3.Send(5);
             sender1.Send(1);
             sender2.Send(3);
-            sender3.Send(5);
             Assert.AreEqual(9, receiver.Reduce());
 
+            sender3.Send(6);
             sender1.Send(2);
             sender2.Send(4);
-            sender3.Send(6);
             Assert.AreEqual(12, receiver.Reduce());
 
+            sender3.Send(9);
             sender1.Send(3);
             sender2.Send(6);
-            sender3.Send(9);
             Assert.AreEqual(18, receiver.Reduce());
         }
 
         [TestMethod]
         public void TestScatterOperator()
         {
-            //NameServer nameServer = new NameServer(0);
-
             string groupName = "group1";
             string operatorName = "scatter";
             string masterTaskId = "task0";
             string driverId = "Driver Id";
             int numTasks = 5;
+            int fanOut = 2;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new 
AvroConfigurationSerializer());
+            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
 
             var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
                 .AddScatter(operatorName, new 
ScatterOperatorSpec<int>(masterTaskId, new IntCodec()))
                 .Build();
 
-            //for (int i = 0; i < numTasks; i++)
-            //{
-            //    nameServer.Register("task" + i, new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), 21));
-            //}
-
             List<IConfiguration> partialConfigs = new List<IConfiguration>();
             for (int i = 0; i < numTasks; i++)
             {
@@ -620,25 +604,19 @@ namespace Org.Apache.REEF.Tests.Network
         [TestMethod]
         public void TestScatterOperator2()
         {
-            //NameServer nameServer = new NameServer(0);
-
             string groupName = "group1";
             string operatorName = "scatter";
             string masterTaskId = "task0";
             string driverId = "Driver Id";
             int numTasks = 5;
+            int fanOut = 2;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new 
AvroConfigurationSerializer());
+            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
 
             var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
                 .AddScatter(operatorName, new 
ScatterOperatorSpec<int>(masterTaskId, new IntCodec()))
                 .Build();
 
-            //for (int i = 0; i < numTasks; i++)
-            //{
-            //    nameServer.Register("task" + i, new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), 21));
-            //}
-
             List<IConfiguration> partialConfigs = new List<IConfiguration>();
             for (int i = 0; i < numTasks; i++)
             {
@@ -703,25 +681,19 @@ namespace Org.Apache.REEF.Tests.Network
         [TestMethod]
         public void TestScatterOperator3()
         {
-            //NameServer nameServer = new NameServer(0);
-
             string groupName = "group1";
             string operatorName = "scatter";
             string masterTaskId = "task0";
             string driverId = "Driver Id";
             int numTasks = 4;
+            int fanOut = 2;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new 
AvroConfigurationSerializer());
+            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
 
             var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
                 .AddScatter(operatorName, new 
ScatterOperatorSpec<int>(masterTaskId, new IntCodec()))
                 .Build();
 
-            //for (int i = 0; i < numTasks; i++)
-            //{
-            //    nameServer.Register("task" + i, new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), 21));
-            //}
-
             List<IConfiguration> partialConfigs = new List<IConfiguration>();
             for (int i = 0; i < numTasks; i++)
             {
@@ -783,25 +755,19 @@ namespace Org.Apache.REEF.Tests.Network
         [TestMethod]
         public void TestScatterOperator4()
         {
-            //NameServer nameServer = new NameServer(0);
-
             string groupName = "group1";
             string operatorName = "scatter";
             string masterTaskId = "task0";
             string driverId = "Driver Id";
             int numTasks = 4;
+            int fanOut = 2;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, new 
AvroConfigurationSerializer());
+            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
 
             var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
                 .AddScatter(operatorName, new 
ScatterOperatorSpec<int>(masterTaskId, new IntCodec()))
                 .Build();
 
-            //for (int i = 0; i < numTasks; i++)
-            //{
-            //    nameServer.Register("task" + i, new 
IPEndPoint(IPAddress.Parse("127.0.0.1"), 21));
-            //}
-
             List<IConfiguration> partialConfigs = new List<IConfiguration>();
             for (int i = 0; i < numTasks; i++)
             {
@@ -895,7 +861,7 @@ namespace Org.Apache.REEF.Tests.Network
                 handler, new StringIdentifierFactory(), new 
GroupCommunicationMessageCodec());
         }
 
-        private GroupCommunicationMessage CreateGcm(string message, string 
from, string to)
+       private GroupCommunicationMessage CreateGcm(string message, string 
from, string to)
         {
             byte[] data = Encoding.UTF8.GetBytes(message);
             return new GroupCommunicationMessage("g1", "op1", from, to, data, 
MessageType.Data);
@@ -927,4 +893,4 @@ namespace Org.Apache.REEF.Tests.Network
             }
         }
     }
-}
+}
\ No newline at end of file

Reply via email to