Repository: incubator-reef Updated Branches: refs/heads/master 860c2fce5 -> 871071203
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/87107120/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs index 9cdda9c..e74c506 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Network/GroupCommunicationTreeTopologyTests.cs @@ -17,23 +17,13 @@ * under the License. */ -using System; using System.Collections.Generic; using System.Linq; using Microsoft.VisualStudio.TestTools.UnitTesting; -using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Network.Group.Driver; -using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Operators.Impl; -using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Network.Group.Topology; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Implementations.Configuration; -using Org.Apache.REEF.Tang.Implementations.Tang; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.Remote.Impl; namespace Org.Apache.REEF.Tests.Network @@ -64,40 +54,17 @@ namespace Org.Apache.REEF.Tests.Network string groupName = "group1"; string operatorName = "reduce"; int numTasks = 10; + string driverId = "driverId"; + string masterTaskId = "task0"; int fanOut = 3; - IMpiDriver mpiDriver = new MpiDriver("driverid", "task0", fanOut, new AvroConfigurationSerializer()); - - var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) - .AddReduce(operatorName, new ReduceOperatorSpec<int>("task0", new IntCodec(), new SumFunction()), TopologyTypes.Tree) - .Build(); - - List<IConfiguration> partialConfigs = new List<IConfiguration>(); - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration partialTaskConfig = TangFactory.GetTang().NewConfigurationBuilder( - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, taskId) - .Set(TaskConfiguration.Task, GenericType<MyTask>.Class) - .Build()) - .Build(); - commGroup.AddTask(taskId); - partialConfigs.Add(partialTaskConfig); - } - IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration(); - IList<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>(); + var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration taskConfig = mpiDriver.GetMpiTaskConfiguration(taskId); - IConfiguration mergedConf = Configurations.Merge(taskConfig, partialConfigs[i], serviceConfig); - IInjector injector = TangFactory.GetTang().NewInjector(mergedConf); + ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + .AddReduce(operatorName, masterTaskId, new IntCodec(), new SumFunction(), TopologyTypes.Tree) + .Build(); - IMpiClient mpiClient = injector.GetInstance<IMpiClient>(); - commGroups.Add(mpiClient.GetCommunicationGroup(groupName)); - } + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName); IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName); @@ -147,40 +114,13 @@ namespace Org.Apache.REEF.Tests.Network int value3 = 99; int fanOut = 3; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); + var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) - .AddBroadcast(operatorName, new BroadcastOperatorSpec<int>(masterTaskId, new IntCodec()), TopologyTypes.Tree) + ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + .AddBroadcast(operatorName, masterTaskId, new IntCodec(), TopologyTypes.Tree) .Build(); - List<IConfiguration> partialConfigs = new List<IConfiguration>(); - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration partialTaskConfig = TangFactory.GetTang().NewConfigurationBuilder( - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, taskId) - .Set(TaskConfiguration.Task, GenericType<GroupCommunicationTreeTopologyTests.MyTask>.Class) - .Build()) - .Build(); - - commGroup.AddTask(taskId); - partialConfigs.Add(partialTaskConfig); - } - - IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration(); - - IList<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>(); - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration mpiTaskConfig = mpiDriver.GetMpiTaskConfiguration(taskId); - IConfiguration mergedConf = Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig); - IInjector injector = TangFactory.GetTang().NewInjector(mergedConf); - - IMpiClient mpiClient = injector.GetInstance<IMpiClient>(); - commGroups.Add(mpiClient.GetCommunicationGroup(groupName)); - } + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName); IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName); @@ -250,53 +190,23 @@ namespace Org.Apache.REEF.Tests.Network int numTasks = 10; int fanOut = 3; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); + var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - ICommunicationGroupDriver commGroup = mpiDriver.NewCommunicationGroup( - groupName, - numTasks) - .AddBroadcast( + ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + .AddBroadcast<int>( broadcastOperatorName, - new BroadcastOperatorSpec<int>( masterTaskId, - new IntCodec()), + new IntCodec(), TopologyTypes.Tree) - .AddReduce( + .AddReduce<int>( reduceOperatorName, - new ReduceOperatorSpec<int>( masterTaskId, new IntCodec(), - new SumFunction()), + new SumFunction(), TopologyTypes.Tree) .Build(); - List<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>(); - IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration(); - - List<IConfiguration> partialConfigs = new List<IConfiguration>(); - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration partialTaskConfig = TangFactory.GetTang().NewConfigurationBuilder( - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, taskId) - .Set(TaskConfiguration.Task, GenericType<MyTask>.Class) - .Build()) - .Build(); - commGroup.AddTask(taskId); - partialConfigs.Add(partialTaskConfig); - } - - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration mpiTaskConfig = mpiDriver.GetMpiTaskConfiguration(taskId); - IConfiguration mergedConf = Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig); - IInjector injector = TangFactory.GetTang().NewInjector(mergedConf); - - IMpiClient mpiClient = injector.GetInstance<IMpiClient>(); - commGroups.Add(mpiClient.GetCommunicationGroup(groupName)); - } + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); //for master task IBroadcastSender<int> broadcastSender = commGroups[0].GetBroadcastSender<int>(broadcastOperatorName); @@ -352,35 +262,35 @@ namespace Org.Apache.REEF.Tests.Network Assert.AreEqual(i, n8); Assert.AreEqual(i, n9); - int triangleNum9 = TriangleNumber(n9); + int triangleNum9 = GroupCommunicationTests.TriangleNumber(n9); triangleNumberSender9.Send(triangleNum9); - int triangleNum8 = TriangleNumber(n8); + int triangleNum8 = GroupCommunicationTests.TriangleNumber(n8); triangleNumberSender8.Send(triangleNum8); - int triangleNum7 = TriangleNumber(n7); + int triangleNum7 = GroupCommunicationTests.TriangleNumber(n7); triangleNumberSender7.Send(triangleNum7); - int triangleNum6 = TriangleNumber(n6); + int triangleNum6 = GroupCommunicationTests.TriangleNumber(n6); triangleNumberSender6.Send(triangleNum6); - int triangleNum5 = TriangleNumber(n5); + int triangleNum5 = GroupCommunicationTests.TriangleNumber(n5); triangleNumberSender5.Send(triangleNum5); - int triangleNum4 = TriangleNumber(n4); + int triangleNum4 = GroupCommunicationTests.TriangleNumber(n4); triangleNumberSender4.Send(triangleNum4); - int triangleNum3 = TriangleNumber(n3); + int triangleNum3 = GroupCommunicationTests.TriangleNumber(n3); triangleNumberSender3.Send(triangleNum3); - int triangleNum2 = TriangleNumber(n2); + int triangleNum2 = GroupCommunicationTests.TriangleNumber(n2); triangleNumberSender2.Send(triangleNum2); - int triangleNum1 = TriangleNumber(n1); + int triangleNum1 = GroupCommunicationTests.TriangleNumber(n1); triangleNumberSender1.Send(triangleNum1); int sum = sumReducer.Reduce(); - int expected = TriangleNumber(i) * (numTasks - 1); + int expected = GroupCommunicationTests.TriangleNumber(i) * (numTasks - 1); Assert.AreEqual(sum, expected); } } @@ -395,40 +305,13 @@ namespace Org.Apache.REEF.Tests.Network int numTasks = 5; int fanOut = 2; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); + var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) - .AddScatter(operatorName, new ScatterOperatorSpec<int>(masterTaskId, new IntCodec()), TopologyTypes.Tree) + ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + .AddScatter(operatorName, masterTaskId, new IntCodec(), TopologyTypes.Tree) .Build(); - List<IConfiguration> partialConfigs = new List<IConfiguration>(); - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration partialTaskConfig = TangFactory.GetTang().NewConfigurationBuilder( - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, taskId) - .Set(TaskConfiguration.Task, GenericType<GroupCommunicationTreeTopologyTests.MyTask>.Class) - .Build()) - .Build(); - commGroup.AddTask(taskId); - partialConfigs.Add(partialTaskConfig); - } - - IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration(); - - List<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>(); - - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration mpiTaskConfig = mpiDriver.GetMpiTaskConfiguration(taskId); - IConfiguration mergedConf = Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig); - IInjector injector = TangFactory.GetTang().NewInjector(mergedConf); - - IMpiClient mpiClient = injector.GetInstance<IMpiClient>(); - commGroups.Add(mpiClient.GetCommunicationGroup(groupName)); - } + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -467,40 +350,13 @@ namespace Org.Apache.REEF.Tests.Network int numTasks = 5; int fanOut = 2; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); + var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) - .AddScatter(operatorName, new ScatterOperatorSpec<int>(masterTaskId, new IntCodec()), TopologyTypes.Tree) + ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + .AddScatter(operatorName, masterTaskId, new IntCodec(), TopologyTypes.Tree) .Build(); - List<IConfiguration> partialConfigs = new List<IConfiguration>(); - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration partialTaskConfig = TangFactory.GetTang().NewConfigurationBuilder( - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, taskId) - .Set(TaskConfiguration.Task, GenericType<MyTask>.Class) - .Build()) - .Build(); - commGroup.AddTask(taskId); - partialConfigs.Add(partialTaskConfig); - } - - IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration(); - - List<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>(); - - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration mpiTaskConfig = mpiDriver.GetMpiTaskConfiguration(taskId); - IConfiguration mergedConf = Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig); - IInjector injector = TangFactory.GetTang().NewInjector(mergedConf); - - IMpiClient mpiClient = injector.GetInstance<IMpiClient>(); - commGroups.Add(mpiClient.GetCommunicationGroup(groupName)); - } + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -548,40 +404,13 @@ namespace Org.Apache.REEF.Tests.Network int numTasks = 4; int fanOut = 2; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); + var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) - .AddScatter(operatorName, new ScatterOperatorSpec<int>(masterTaskId, new IntCodec()), TopologyTypes.Tree) + ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + .AddScatter(operatorName, masterTaskId, new IntCodec(), TopologyTypes.Tree) .Build(); - List<IConfiguration> partialConfigs = new List<IConfiguration>(); - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration partialTaskConfig = TangFactory.GetTang().NewConfigurationBuilder( - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, taskId) - .Set(TaskConfiguration.Task, GenericType<MyTask>.Class) - .Build()) - .Build(); - commGroup.AddTask(taskId); - partialConfigs.Add(partialTaskConfig); - } - - IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration(); - - List<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>(); - - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration mpiTaskConfig = mpiDriver.GetMpiTaskConfiguration(taskId); - IConfiguration mergedConf = Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig); - IInjector injector = TangFactory.GetTang().NewInjector(mergedConf); - - IMpiClient mpiClient = injector.GetInstance<IMpiClient>(); - commGroups.Add(mpiClient.GetCommunicationGroup(groupName)); - } + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -626,40 +455,13 @@ namespace Org.Apache.REEF.Tests.Network int numTasks = 4; int fanOut = 2; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); + var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) - .AddScatter(operatorName, new ScatterOperatorSpec<int>(masterTaskId, new IntCodec()), TopologyTypes.Tree) + ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + .AddScatter(operatorName, masterTaskId, new IntCodec(), TopologyTypes.Tree) .Build(); - List<IConfiguration> partialConfigs = new List<IConfiguration>(); - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration partialTaskConfig = TangFactory.GetTang().NewConfigurationBuilder( - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, taskId) - .Set(TaskConfiguration.Task, GenericType<MyTask>.Class) - .Build()) - .Build(); - commGroup.AddTask(taskId); - partialConfigs.Add(partialTaskConfig); - } - - IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration(); - - List<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>(); - - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration mpiTaskConfig = mpiDriver.GetMpiTaskConfiguration(taskId); - IConfiguration mergedConf = Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig); - IInjector injector = TangFactory.GetTang().NewInjector(mergedConf); - - IMpiClient mpiClient = injector.GetInstance<IMpiClient>(); - commGroups.Add(mpiClient.GetCommunicationGroup(groupName)); - } + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -705,40 +507,13 @@ namespace Org.Apache.REEF.Tests.Network int numTasks = 6; int fanOut = 2; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); + var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - var commGroup = mpiDriver.NewCommunicationGroup(groupName, numTasks) - .AddScatter(operatorName, new ScatterOperatorSpec<int>(masterTaskId, new IntCodec()), TopologyTypes.Tree) + ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + .AddScatter(operatorName, masterTaskId, new IntCodec(), TopologyTypes.Tree) .Build(); - List<IConfiguration> partialConfigs = new List<IConfiguration>(); - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration partialTaskConfig = TangFactory.GetTang().NewConfigurationBuilder( - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, taskId) - .Set(TaskConfiguration.Task, GenericType<MyTask>.Class) - .Build()) - .Build(); - commGroup.AddTask(taskId); - partialConfigs.Add(partialTaskConfig); - } - - IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration(); - - List<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>(); - - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration mpiTaskConfig = mpiDriver.GetMpiTaskConfiguration(taskId); - IConfiguration mergedConf = Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig); - IInjector injector = TangFactory.GetTang().NewInjector(mergedConf); - - IMpiClient mpiClient = injector.GetInstance<IMpiClient>(); - commGroups.Add(mpiClient.GetCommunicationGroup(groupName)); - } + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -796,52 +571,22 @@ namespace Org.Apache.REEF.Tests.Network int numTasks = 5; int fanOut = 2; - IMpiDriver mpiDriver = new MpiDriver(driverId, masterTaskId, fanOut, new AvroConfigurationSerializer()); + var mpiDriver = GroupCommunicationTests.GetInstanceOfMpiDriver(driverId, masterTaskId, groupName, fanOut, numTasks); - ICommunicationGroupDriver commGroup = mpiDriver.NewCommunicationGroup( - groupName, - numTasks) - .AddScatter( + ICommunicationGroupDriver commGroup = mpiDriver.DefaultGroup + .AddScatter( scatterOperatorName, - new ScatterOperatorSpec<int>( - masterTaskId, - new IntCodec()), TopologyTypes.Tree) + masterTaskId, + new IntCodec(), + TopologyTypes.Tree) .AddReduce( reduceOperatorName, - new ReduceOperatorSpec<int>( - masterTaskId, - new IntCodec(), - new SumFunction()), TopologyTypes.Tree) - .Build(); - - List<IConfiguration> partialConfigs = new List<IConfiguration>(); - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration partialTaskConfig = TangFactory.GetTang().NewConfigurationBuilder( - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, taskId) - .Set(TaskConfiguration.Task, GenericType<MyTask>.Class) - .Build()) - .Build(); - commGroup.AddTask(taskId); - partialConfigs.Add(partialTaskConfig); - } - - IConfiguration serviceConfig = mpiDriver.GetServiceConfiguration(); - - List<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>(); - - for (int i = 0; i < numTasks; i++) - { - string taskId = "task" + i; - IConfiguration mpiTaskConfig = mpiDriver.GetMpiTaskConfiguration(taskId); - IConfiguration mergedConf = Configurations.Merge(mpiTaskConfig, partialConfigs[i], serviceConfig); - IInjector injector = TangFactory.GetTang().NewInjector(mergedConf); + masterTaskId, + new IntCodec(), + new SumFunction(), + TopologyTypes.Tree).Build(); - IMpiClient mpiClient = injector.GetInstance<IMpiClient>(); - commGroups.Add(mpiClient.GetCommunicationGroup(groupName)); - } + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, mpiDriver, commGroup); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(scatterOperatorName); IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName); @@ -889,43 +634,5 @@ namespace Org.Apache.REEF.Tests.Network int sum = sumReducer.Reduce(); Assert.AreEqual(sum, 6325); } - - private static void ScatterReceiveReduce(IScatterReceiver<int> receiver, IReduceSender<int> sumSender) - { - List<int> data1 = receiver.Receive(); - int sum1 = data1.Sum(); - sumSender.Send(sum1); - } - - private int TriangleNumber(int n) - { - return Enumerable.Range(1, n).Sum(); - } - - private class SumFunction : IReduceFunction<int> - { - [Inject] - public SumFunction() - { - } - - public int Reduce(IEnumerable<int> elements) - { - return elements.Sum(); - } - } - - private class MyTask : ITask - { - public void Dispose() - { - throw new NotImplementedException(); - } - - public byte[] Call(byte[] memento) - { - throw new NotImplementedException(); - } - } } }
