Repository: incubator-reef
Updated Branches:
  refs/heads/master 860c2fce5 -> 871071203


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/87107120/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs 
b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
index 9cdda9c..e74c506 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs
@@ -17,23 +17,13 @@
  * under the License.
  */
 
-using System;
 using System.Collections.Generic;
 using System.Linq;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Operators.Impl;
-using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Network.Group.Topology;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Formats;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Wake.Remote.Impl;
 
 namespace Org.Apache.REEF.Tests.Network
@@ -64,40 +54,17 @@ namespace Org.Apache.REEF.Tests.Network
             string groupName = "group1";
             string operatorName = "reduce";
             int numTasks = 10;
+            string driverId = "driverId";
+            string masterTaskId = "task0";
             int fanOut = 3;
-            IMpiDriver mpiDriver = new MpiDriver("driverid", "task0", fanOut, 
new AvroConfigurationSerializer());
-
-            var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
-                .AddReduce(operatorName, new ReduceOperatorSpec<int>("task0", 
new IntCodec(), new SumFunction()), TopologyTypes.Tree)
-                .Build();
-
-            List<IConfiguration> partialConfigs = new List<IConfiguration>();
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration partialTaskConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                    TaskConfiguration.ConfigurationModule
-                        .Set(TaskConfiguration.Identifier, taskId)
-                        .Set(TaskConfiguration.Task, GenericType<MyTask>.Class)
-                        .Build())
-                    .Build();
-                commGroup.AddTask(taskId);
-                partialConfigs.Add(partialTaskConfig);
-            }
 
-            IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration();
-            IList<ICommunicationGroupClient> commGroups = new 
List<ICommunicationGroupClient>();
+            var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration taskConfig = 
mpiDriver.GetMpiTaskConfiguration(taskId);
-                IConfiguration mergedConf = Configurations.Merge(taskConfig, 
partialConfigs[i], serviceConfig);
-                IInjector injector = 
TangFactory.GetTang().NewInjector(mergedConf);
+            ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+                .AddReduce(operatorName, masterTaskId, new IntCodec(), new 
SumFunction(), TopologyTypes.Tree)
+                .Build();
 
-                IMpiClient mpiClient = injector.GetInstance<IMpiClient>();
-                commGroups.Add(mpiClient.GetCommunicationGroup(groupName));
-            }
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
 
             IReduceReceiver<int> receiver = 
commGroups[0].GetReduceReceiver<int>(operatorName);
             IReduceSender<int> sender1 = 
commGroups[1].GetReduceSender<int>(operatorName);
@@ -147,40 +114,13 @@ namespace Org.Apache.REEF.Tests.Network
             int value3 = 99;
             int fanOut = 3;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
+            var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
-            var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
-                .AddBroadcast(operatorName, new 
BroadcastOperatorSpec<int>(masterTaskId, new IntCodec()), TopologyTypes.Tree)
+            ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+                .AddBroadcast(operatorName, masterTaskId, new IntCodec(), 
TopologyTypes.Tree)
                 .Build();
 
-            List<IConfiguration> partialConfigs = new List<IConfiguration>();
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration partialTaskConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                    TaskConfiguration.ConfigurationModule
-                        .Set(TaskConfiguration.Identifier, taskId)
-                        .Set(TaskConfiguration.Task, 
GenericType<GroupCommunicationTreeTopologyTests.MyTask>.Class)
-                        .Build())
-                    .Build();
-
-                commGroup.AddTask(taskId);
-                partialConfigs.Add(partialTaskConfig);
-            }
-
-            IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration();
-
-            IList<ICommunicationGroupClient> commGroups = new 
List<ICommunicationGroupClient>();
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration mpiTaskConfig = 
mpiDriver.GetMpiTaskConfiguration(taskId);
-                IConfiguration mergedConf = 
Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig);
-                IInjector injector = 
TangFactory.GetTang().NewInjector(mergedConf);
-
-                IMpiClient mpiClient = injector.GetInstance<IMpiClient>();
-                commGroups.Add(mpiClient.GetCommunicationGroup(groupName));
-            }
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
 
             IBroadcastSender<int> sender = 
commGroups[0].GetBroadcastSender<int>(operatorName);
             IBroadcastReceiver<int> receiver1 = 
commGroups[1].GetBroadcastReceiver<int>(operatorName);
@@ -250,53 +190,23 @@ namespace Org.Apache.REEF.Tests.Network
             int numTasks = 10;
             int fanOut = 3;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
+            var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
-            ICommunicationGroupDriver commGroup = 
mpiDriver.NewCommunicationGroup(
-                groupName,
-                numTasks)
-                .AddBroadcast(
+            ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+                .AddBroadcast<int>(
                     broadcastOperatorName,
-                    new BroadcastOperatorSpec<int>(
                     masterTaskId,
-                    new IntCodec()), 
+                    new IntCodec(), 
                     TopologyTypes.Tree)
-                .AddReduce(
+                .AddReduce<int>(
                     reduceOperatorName,
-                    new ReduceOperatorSpec<int>(
                     masterTaskId,
                     new IntCodec(),
-                    new SumFunction()),
+                    new SumFunction(),
                     TopologyTypes.Tree)
                 .Build();
 
-            List<ICommunicationGroupClient> commGroups = new 
List<ICommunicationGroupClient>();
-            IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration();
-
-            List<IConfiguration> partialConfigs = new List<IConfiguration>();
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration partialTaskConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                    TaskConfiguration.ConfigurationModule
-                        .Set(TaskConfiguration.Identifier, taskId)
-                        .Set(TaskConfiguration.Task, GenericType<MyTask>.Class)
-                        .Build())
-                    .Build();
-                commGroup.AddTask(taskId);
-                partialConfigs.Add(partialTaskConfig);
-            }
-
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration mpiTaskConfig = 
mpiDriver.GetMpiTaskConfiguration(taskId);
-                IConfiguration mergedConf = 
Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig);
-                IInjector injector = 
TangFactory.GetTang().NewInjector(mergedConf);
-
-                IMpiClient mpiClient = injector.GetInstance<IMpiClient>();
-                commGroups.Add(mpiClient.GetCommunicationGroup(groupName));
-            }
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
 
             //for master task
             IBroadcastSender<int> broadcastSender = 
commGroups[0].GetBroadcastSender<int>(broadcastOperatorName);
@@ -352,35 +262,35 @@ namespace Org.Apache.REEF.Tests.Network
                 Assert.AreEqual(i, n8);
                 Assert.AreEqual(i, n9);
 
-                int triangleNum9 = TriangleNumber(n9);
+                int triangleNum9 = GroupCommunicationTests.TriangleNumber(n9);
                 triangleNumberSender9.Send(triangleNum9);
 
-                int triangleNum8 = TriangleNumber(n8);
+                int triangleNum8 = GroupCommunicationTests.TriangleNumber(n8);
                 triangleNumberSender8.Send(triangleNum8);
 
-                int triangleNum7 = TriangleNumber(n7);
+                int triangleNum7 = GroupCommunicationTests.TriangleNumber(n7);
                 triangleNumberSender7.Send(triangleNum7);
 
-                int triangleNum6 = TriangleNumber(n6);
+                int triangleNum6 = GroupCommunicationTests.TriangleNumber(n6);
                 triangleNumberSender6.Send(triangleNum6);
 
-                int triangleNum5 = TriangleNumber(n5);
+                int triangleNum5 = GroupCommunicationTests.TriangleNumber(n5);
                 triangleNumberSender5.Send(triangleNum5);
 
-                int triangleNum4 = TriangleNumber(n4);
+                int triangleNum4 = GroupCommunicationTests.TriangleNumber(n4);
                 triangleNumberSender4.Send(triangleNum4);
 
-                int triangleNum3 = TriangleNumber(n3);
+                int triangleNum3 = GroupCommunicationTests.TriangleNumber(n3);
                 triangleNumberSender3.Send(triangleNum3);
 
-                int triangleNum2 = TriangleNumber(n2);
+                int triangleNum2 = GroupCommunicationTests.TriangleNumber(n2);
                 triangleNumberSender2.Send(triangleNum2);
 
-                int triangleNum1 = TriangleNumber(n1);
+                int triangleNum1 = GroupCommunicationTests.TriangleNumber(n1);
                 triangleNumberSender1.Send(triangleNum1);
 
                 int sum = sumReducer.Reduce();
-                int expected = TriangleNumber(i) * (numTasks - 1);
+                int expected = GroupCommunicationTests.TriangleNumber(i) * 
(numTasks - 1);
                 Assert.AreEqual(sum, expected);
             }
         }
@@ -395,40 +305,13 @@ namespace Org.Apache.REEF.Tests.Network
             int numTasks = 5;
             int fanOut = 2;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
+            var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
-            var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
-                .AddScatter(operatorName, new 
ScatterOperatorSpec<int>(masterTaskId, new IntCodec()), TopologyTypes.Tree)
+            ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+                .AddScatter(operatorName, masterTaskId, new IntCodec(), 
TopologyTypes.Tree)
                 .Build();
 
-            List<IConfiguration> partialConfigs = new List<IConfiguration>();
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration partialTaskConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                    TaskConfiguration.ConfigurationModule
-                        .Set(TaskConfiguration.Identifier, taskId)
-                        .Set(TaskConfiguration.Task, 
GenericType<GroupCommunicationTreeTopologyTests.MyTask>.Class)
-                        .Build())
-                    .Build();
-                commGroup.AddTask(taskId);
-                partialConfigs.Add(partialTaskConfig);
-            }
-
-            IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration();
-
-            List<ICommunicationGroupClient> commGroups = new 
List<ICommunicationGroupClient>();
-
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration mpiTaskConfig = 
mpiDriver.GetMpiTaskConfiguration(taskId);
-                IConfiguration mergedConf = 
Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig);
-                IInjector injector = 
TangFactory.GetTang().NewInjector(mergedConf);
-
-                IMpiClient mpiClient = injector.GetInstance<IMpiClient>();
-                commGroups.Add(mpiClient.GetCommunicationGroup(groupName));
-            }
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -467,40 +350,13 @@ namespace Org.Apache.REEF.Tests.Network
             int numTasks = 5;
             int fanOut = 2;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
+            var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
-            var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
-                .AddScatter(operatorName, new 
ScatterOperatorSpec<int>(masterTaskId, new IntCodec()), TopologyTypes.Tree)
+            ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+                .AddScatter(operatorName, masterTaskId, new IntCodec(), 
TopologyTypes.Tree)
                 .Build();
 
-            List<IConfiguration> partialConfigs = new List<IConfiguration>();
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration partialTaskConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                    TaskConfiguration.ConfigurationModule
-                        .Set(TaskConfiguration.Identifier, taskId)
-                        .Set(TaskConfiguration.Task, GenericType<MyTask>.Class)
-                        .Build())
-                    .Build();
-                commGroup.AddTask(taskId);
-                partialConfigs.Add(partialTaskConfig);
-            }
-
-            IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration();
-
-            List<ICommunicationGroupClient> commGroups = new 
List<ICommunicationGroupClient>();
-
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration mpiTaskConfig = 
mpiDriver.GetMpiTaskConfiguration(taskId);
-                IConfiguration mergedConf = 
Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig);
-                IInjector injector = 
TangFactory.GetTang().NewInjector(mergedConf);
-
-                IMpiClient mpiClient = injector.GetInstance<IMpiClient>();
-                commGroups.Add(mpiClient.GetCommunicationGroup(groupName));
-            }
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -548,40 +404,13 @@ namespace Org.Apache.REEF.Tests.Network
             int numTasks = 4;
             int fanOut = 2;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
+            var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
-            var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
-                .AddScatter(operatorName, new 
ScatterOperatorSpec<int>(masterTaskId, new IntCodec()), TopologyTypes.Tree)
+            ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+                .AddScatter(operatorName, masterTaskId, new IntCodec(), 
TopologyTypes.Tree)
                 .Build();
 
-            List<IConfiguration> partialConfigs = new List<IConfiguration>();
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration partialTaskConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                    TaskConfiguration.ConfigurationModule
-                        .Set(TaskConfiguration.Identifier, taskId)
-                        .Set(TaskConfiguration.Task, GenericType<MyTask>.Class)
-                        .Build())
-                    .Build();
-                commGroup.AddTask(taskId);
-                partialConfigs.Add(partialTaskConfig);
-            }
-
-            IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration();
-
-            List<ICommunicationGroupClient> commGroups = new 
List<ICommunicationGroupClient>();
-
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration mpiTaskConfig = 
mpiDriver.GetMpiTaskConfiguration(taskId);
-                IConfiguration mergedConf = 
Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig);
-                IInjector injector = 
TangFactory.GetTang().NewInjector(mergedConf);
-
-                IMpiClient mpiClient = injector.GetInstance<IMpiClient>();
-                commGroups.Add(mpiClient.GetCommunicationGroup(groupName));
-            }
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -626,40 +455,13 @@ namespace Org.Apache.REEF.Tests.Network
             int numTasks = 4;
             int fanOut = 2;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
+            var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
-            var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
-                .AddScatter(operatorName, new 
ScatterOperatorSpec<int>(masterTaskId, new IntCodec()), TopologyTypes.Tree)
+            ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+                .AddScatter(operatorName, masterTaskId, new IntCodec(), 
TopologyTypes.Tree)
                 .Build();
 
-            List<IConfiguration> partialConfigs = new List<IConfiguration>();
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration partialTaskConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                    TaskConfiguration.ConfigurationModule
-                        .Set(TaskConfiguration.Identifier, taskId)
-                        .Set(TaskConfiguration.Task, GenericType<MyTask>.Class)
-                        .Build())
-                    .Build();
-                commGroup.AddTask(taskId);
-                partialConfigs.Add(partialTaskConfig);
-            }
-
-            IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration();
-
-            List<ICommunicationGroupClient> commGroups = new 
List<ICommunicationGroupClient>();
-
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration mpiTaskConfig = 
mpiDriver.GetMpiTaskConfiguration(taskId);
-                IConfiguration mergedConf = 
Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig);
-                IInjector injector = 
TangFactory.GetTang().NewInjector(mergedConf);
-
-                IMpiClient mpiClient = injector.GetInstance<IMpiClient>();
-                commGroups.Add(mpiClient.GetCommunicationGroup(groupName));
-            }
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -705,40 +507,13 @@ namespace Org.Apache.REEF.Tests.Network
             int numTasks = 6;
             int fanOut = 2;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
+            var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
-            var commGroup = mpiDriver.NewCommunicationGroup(groupName, 
numTasks)
-                .AddScatter(operatorName, new 
ScatterOperatorSpec<int>(masterTaskId, new IntCodec()), TopologyTypes.Tree)
+            ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+                .AddScatter(operatorName, masterTaskId, new IntCodec(), 
TopologyTypes.Tree)
                 .Build();
 
-            List<IConfiguration> partialConfigs = new List<IConfiguration>();
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration partialTaskConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                    TaskConfiguration.ConfigurationModule
-                        .Set(TaskConfiguration.Identifier, taskId)
-                        .Set(TaskConfiguration.Task, GenericType<MyTask>.Class)
-                        .Build())
-                    .Build();
-                commGroup.AddTask(taskId);
-                partialConfigs.Add(partialTaskConfig);
-            }
-
-            IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration();
-
-            List<ICommunicationGroupClient> commGroups = new 
List<ICommunicationGroupClient>();
-
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration mpiTaskConfig = 
mpiDriver.GetMpiTaskConfiguration(taskId);
-                IConfiguration mergedConf = 
Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig);
-                IInjector injector = 
TangFactory.GetTang().NewInjector(mergedConf);
-
-                IMpiClient mpiClient = injector.GetInstance<IMpiClient>();
-                commGroups.Add(mpiClient.GetCommunicationGroup(groupName));
-            }
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(operatorName);
             IScatterReceiver<int> receiver1 = 
commGroups[1].GetScatterReceiver<int>(operatorName);
@@ -796,52 +571,22 @@ namespace Org.Apache.REEF.Tests.Network
             int numTasks = 5;
             int fanOut = 2;
 
-            IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, 
fanOut, new AvroConfigurationSerializer());
+            var mpiDriver = 
GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, 
groupName, fanOut, numTasks);
 
-            ICommunicationGroupDriver commGroup = 
mpiDriver.NewCommunicationGroup(
-                groupName,
-                numTasks)
-                .AddScatter(
+            ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup
+              .AddScatter(
                     scatterOperatorName,
-                    new ScatterOperatorSpec<int>(
-                        masterTaskId,
-                        new IntCodec()), TopologyTypes.Tree)
+                    masterTaskId,
+                    new IntCodec(), 
+                    TopologyTypes.Tree)
                 .AddReduce(
                     reduceOperatorName,
-                    new ReduceOperatorSpec<int>(
-                        masterTaskId,
-                        new IntCodec(),
-                        new SumFunction()), TopologyTypes.Tree)
-                .Build();
-
-            List<IConfiguration> partialConfigs = new List<IConfiguration>();
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration partialTaskConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                    TaskConfiguration.ConfigurationModule
-                        .Set(TaskConfiguration.Identifier, taskId)
-                        .Set(TaskConfiguration.Task, GenericType<MyTask>.Class)
-                        .Build())
-                    .Build();
-                commGroup.AddTask(taskId);
-                partialConfigs.Add(partialTaskConfig);
-            }
-
-            IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration();
-
-            List<ICommunicationGroupClient> commGroups = new 
List<ICommunicationGroupClient>();
-
-            for (int i = 0; i < numTasks; i++)
-            {
-                string taskId = "task" + i;
-                IConfiguration mpiTaskConfig = 
mpiDriver.GetMpiTaskConfiguration(taskId);
-                IConfiguration mergedConf = 
Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig);
-                IInjector injector = 
TangFactory.GetTang().NewInjector(mergedConf);
+                    masterTaskId,
+                    new IntCodec(),
+                    new SumFunction(), 
+                    TopologyTypes.Tree).Build();
 
-                IMpiClient mpiClient = injector.GetInstance<IMpiClient>();
-                commGroups.Add(mpiClient.GetCommunicationGroup(groupName));
-            }
+            var commGroups = 
GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, 
commGroup);
 
             IScatterSender<int> sender = 
commGroups[0].GetScatterSender<int>(scatterOperatorName);
             IReduceReceiver<int> sumReducer = 
commGroups[0].GetReduceReceiver<int>(reduceOperatorName);
@@ -889,43 +634,5 @@ namespace Org.Apache.REEF.Tests.Network
             int sum = sumReducer.Reduce();
             Assert.AreEqual(sum, 6325);
         }
-
-        private static void ScatterReceiveReduce(IScatterReceiver<int> 
receiver, IReduceSender<int> sumSender)
-        {
-            List<int> data1 = receiver.Receive();
-            int sum1 = data1.Sum();
-            sumSender.Send(sum1);
-        }
-
-        private int TriangleNumber(int n)
-        {
-            return Enumerable.Range(1, n).Sum();
-        }
-
-        private class SumFunction : IReduceFunction<int>
-        {
-            [Inject]
-            public SumFunction()
-            {
-            }
-
-            public int Reduce(IEnumerable<int> elements)
-            {
-                return elements.Sum();
-            }
-        }
-
-        private class MyTask : ITask
-        {
-            public void Dispose()
-            {
-                throw new NotImplementedException();
-            }
-
-            public byte[] Call(byte[] memento)
-            {
-                throw new NotImplementedException();
-            }
-        }
     }
 }

Reply via email to