Repository: incubator-reef Updated Branches: refs/heads/master b9bb7b139 -> c02c80dac
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs new file mode 100644 index 0000000..98a68dd --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Pipelining; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Network.Group.Topology; + +namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest +{ + public class PipelinedBroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedBroadcastReduceDriver)); + + private readonly int _numEvaluators; + private readonly int _numIterations; + private readonly int _chunkSize; + + private readonly IMpiDriver _mpiDriver; + private readonly ICommunicationGroupDriver _commGroup; + private readonly TaskStarter _mpiTaskStarter; + + [Inject] + public PipelinedBroadcastReduceDriver( + [Parameter(typeof (MpiTestConfig.NumEvaluators))] int numEvaluators, + [Parameter(typeof (MpiTestConfig.NumIterations))] int numIterations, + [Parameter(typeof (MpiTestConfig.ChunkSize))] int chunkSize, + MpiDriver mpiDriver) + { + Logger.Log(Level.Info, "*******entering the driver code " + chunkSize); + + Identifier = "BroadcastStartHandler"; + _numEvaluators = numEvaluators; + _numIterations = numIterations; + _chunkSize = chunkSize; + + _mpiDriver = mpiDriver; + + _commGroup = _mpiDriver.DefaultGroup + .AddBroadcast<int[], IntArrayCodec>( + MpiTestConstants.BroadcastOperatorName, + MpiTestConstants.MasterTaskId, + TopologyTypes.Tree, + new PipelineIntDataConverter(_chunkSize)) + .AddReduce<int[], IntArrayCodec>( + MpiTestConstants.ReduceOperatorName, + MpiTestConstants.MasterTaskId, + new ArraySumFunction(), + TopologyTypes.Tree, + new PipelineIntDataConverter(_chunkSize)) + .Build(); + + _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators); + + CreateClassHierarchy(); + } + + public string Identifier { get; set; } + + public void OnNext(IEvaluatorRequestor evaluatorRequestor) + { + EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator"); + evaluatorRequestor.Submit(request); + } + + public void OnNext(IAllocatedEvaluator allocatedEvaluator) + { + IConfiguration contextConf = _mpiDriver.GetContextConfiguration(); + IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration(); + allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); + } + + public void OnNext(IActiveContext activeContext) + { + if (_mpiDriver.IsMasterTaskContext(activeContext)) + { + Logger.Log(Level.Info, "******* Master ID " + activeContext.Id ); + + // Configure Master Task + IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder( + TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, MpiTestConstants.MasterTaskId) + .Set(TaskConfiguration.Task, GenericType<PipelinedMasterTask>.Class) + .Build()) + .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( + GenericType<MpiTestConfig.NumEvaluators>.Class, + _numEvaluators.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<MpiTestConfig.NumIterations, int>( + GenericType<MpiTestConfig.NumIterations>.Class, + _numIterations.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<MpiTestConfig.ArraySize, int>( + GenericType<MpiTestConfig.ArraySize>.Class, + MpiTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture)) + .Build(); + + _commGroup.AddTask(MpiTestConstants.MasterTaskId); + _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + } + else + { + // Configure Slave Task + string slaveTaskId = "SlaveTask-" + activeContext.Id; + IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder( + TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, slaveTaskId) + .Set(TaskConfiguration.Task, GenericType<PipelinedSlaveTask>.Class) + .Build()) + .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( + GenericType<MpiTestConfig.NumEvaluators>.Class, + _numEvaluators.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<MpiTestConfig.NumIterations, int>( + GenericType<MpiTestConfig.NumIterations>.Class, + _numIterations.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<MpiTestConfig.ArraySize, int>( + GenericType<MpiTestConfig.ArraySize>.Class, + MpiTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture)) + .Build(); + + _commGroup.AddTask(slaveTaskId); + _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + } + } + + public void OnNext(IFailedEvaluator value) + { + } + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + + private void CreateClassHierarchy() + { + HashSet<string> clrDlls = new HashSet<string>(); + clrDlls.Add(typeof(IDriver).Assembly.GetName().Name); + clrDlls.Add(typeof(ITask).Assembly.GetName().Name); + clrDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name); + clrDlls.Add(typeof(INameClient).Assembly.GetName().Name); + clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + ClrHandlerHelper.GenerateClassHierarchy(clrDlls); + } + + private class SumFunction : IReduceFunction<int> + { + [Inject] + public SumFunction() + { + } + + public int Reduce(IEnumerable<int> elements) + { + return elements.Sum(); + } + } + + private class ArraySumFunction : IReduceFunction<int[]> + { + [Inject] + public ArraySumFunction() + { + } + + public int[] Reduce(IEnumerable<int[]> elements) + { + int[] result = null; + int count = 0; + + foreach (var element in elements) + { + if (count == 0) + { + result = element.Clone() as int[]; + } + else + { + if (element.Length != result.Length) + { + throw new Exception("integer arrays are of different sizes"); + } + + for (int i = 0; i < result.Length; i++) + { + result[i] += element[i]; + } + } + + count++; + } + + return result; + } + } + + + private class IntArrayCodec : ICodec<int[]> + { + [Inject] + public IntArrayCodec() + { + } + + public byte[] Encode(int[] obj) + { + byte[] result = new byte[sizeof(Int32) * obj.Length]; + Buffer.BlockCopy(obj, 0, result, 0, result.Length); + return result; + } + + public int[] Decode(byte[] data) + { + if (data.Length % sizeof(Int32) != 0) + { + throw new Exception("error inside integer array decoder, byte array length not a multiple of interger size"); + } + + int[] result = new int[data.Length / sizeof(Int32)]; + Buffer.BlockCopy(data, 0, result, 0, data.Length); + return result; + } + } + + public class PipelineIntDataConverter : IPipelineDataConverter<int[]> + { + readonly int _chunkSize; + + [Inject] + public PipelineIntDataConverter([Parameter(typeof(MpiTestConfig.ChunkSize))] int chunkSize) + { + _chunkSize = chunkSize; + } + + public List<PipelineMessage<int[]>> PipelineMessage(int[] message) + { + List<PipelineMessage<int[]>> messageList = new List<PipelineMessage<int[]>>(); + int totalChunks = message.Length / _chunkSize; + + if (message.Length % _chunkSize != 0) + { + totalChunks++; + } + + int counter = 0; + for (int i = 0; i < message.Length; i += _chunkSize) + { + int[] data = new int[Math.Min(_chunkSize, message.Length - i)]; + Buffer.BlockCopy(message, i * sizeof(int), data, 0, data.Length * sizeof(int)); + + messageList.Add(counter == totalChunks - 1 + ? new PipelineMessage<int[]>(data, true) + : new PipelineMessage<int[]>(data, false)); + + counter++; + } + + return messageList; + } + + public int[] FullMessage(List<PipelineMessage<int[]>> pipelineMessage) + { + int size = pipelineMessage.Select(x => x.Data.Length).Sum(); + int[] data = new int[size]; + int offset = 0; + + foreach (var message in pipelineMessage) + { + Buffer.BlockCopy(message.Data, 0, data, offset, message.Data.Length * sizeof(int)); + offset += message.Data.Length * sizeof(int); + } + + return data; + } + + public IConfiguration GetConfiguration() + { + return TangFactory.GetTang().NewConfigurationBuilder() + .BindNamedParameter<MpiTestConfig.ChunkSize, int>(GenericType<MpiTestConfig.ChunkSize>.Class, _chunkSize.ToString(CultureInfo.InvariantCulture)) + .Build(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs new file mode 100644 index 0000000..0df71e6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Globalization; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest +{ + [TestClass] + public class PipelinedBroadcastReduceTest : ReefFunctionalTest + { + [TestInitialize] + public void TestSetup() + { + CleanUp(); + } + + [TestCleanup] + public void TestCleanup() + { + CleanUp(); + } + + [TestMethod] + public void TestBroadcastAndReduce() + { + const int numTasks = 9; + + IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( + DriverBridgeConfiguration.ConfigurationModule + .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnContextActive, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Build()) + .BindNamedParameter<MpiTestConfig.NumIterations, int>( + GenericType<MpiTestConfig.NumIterations>.Class, + MpiTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( + GenericType<MpiTestConfig.NumEvaluators>.Class, + numTasks.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<MpiTestConfig.ChunkSize, int>( + GenericType<MpiTestConfig.ChunkSize>.Class, + MpiTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture)) + .Build(); + + IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindStringNamedParam<MpiConfigurationOptions.DriverId>(MpiTestConstants.DriverId) + .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(MpiTestConstants.MasterTaskId) + .BindStringNamedParam<MpiConfigurationOptions.GroupName>(MpiTestConstants.GroupName) + .BindIntNamedParam<MpiConfigurationOptions.FanOut>(MpiTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture)) + .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture)) + .Build(); + + IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig); + + HashSet<string> appDlls = new HashSet<string>(); + appDlls.Add(typeof(IDriver).Assembly.GetName().Name); + appDlls.Add(typeof(ITask).Assembly.GetName().Name); + appDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name); + appDlls.Add(typeof(INameClient).Assembly.GetName().Name); + appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + TestRun(appDlls, merged, false, JavaLoggingSetting.VERBOSE); + ValidateSuccessForLocalRuntime(numTasks); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs new file mode 100644 index 0000000..922f294 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest +{ + public class PipelinedMasterTask : ITask + { + private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedMasterTask)); + + private readonly int _numIters; + private readonly int _numReduceSenders; + private readonly int _arraySize; + + private readonly IMpiClient _mpiClient; + private readonly ICommunicationGroupClient _commGroup; + private readonly IBroadcastSender<int[]> _broadcastSender; + private readonly IReduceReceiver<int[]> _sumReducer; + + [Inject] + public PipelinedMasterTask( + [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters, + [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators, + [Parameter(typeof(MpiTestConfig.ArraySize))] int arraySize, + IMpiClient mpiClient) + { + Logger.Log(Level.Info, "Hello from master task"); + _numIters = numIters; + _numReduceSenders = numEvaluators - 1; + _arraySize = arraySize; + _mpiClient = mpiClient; + + _commGroup = mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName); + _broadcastSender = _commGroup.GetBroadcastSender<int[]>(MpiTestConstants.BroadcastOperatorName); + _sumReducer = _commGroup.GetReduceReceiver<int[]>(MpiTestConstants.ReduceOperatorName); + Logger.Log(Level.Info, "finished master task constructor"); + } + + public byte[] Call(byte[] memento) + { + int[] intArr = new int[_arraySize]; + + for (int i = 1; i <= _numIters; i++) + { + for (int j = 0; j < _arraySize; j++) + { + intArr[j] = i; + } + + _broadcastSender.Send(intArr); + int[] sum = _sumReducer.Reduce(); + + Logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i); + + int expected = TriangleNumber(i) * _numReduceSenders; + + for (int j = 0; j < intArr.Length; j++) + { + if (sum[j] != TriangleNumber(i) * _numReduceSenders) + { + throw new Exception("Expected " + expected + " but got " + sum); + } + } + } + + return null; + } + + public void Dispose() + { + _mpiClient.Dispose(); + } + + private int TriangleNumber(int n) + { + return Enumerable.Range(1, n).Sum(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs new file mode 100644 index 0000000..5455121 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest +{ + public class PipelinedSlaveTask : ITask + { + private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedSlaveTask)); + + private readonly int _numIterations; + private readonly IMpiClient _mpiClient; + private readonly ICommunicationGroupClient _commGroup; + private readonly IBroadcastReceiver<int[]> _broadcastReceiver; + private readonly IReduceSender<int[]> _triangleNumberSender; + + [Inject] + public PipelinedSlaveTask( + [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters, + IMpiClient mpiClient) + { + Logger.Log(Level.Info, "Hello from slave task"); + + _numIterations = numIters; + _mpiClient = mpiClient; + _commGroup = _mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName); + _broadcastReceiver = _commGroup.GetBroadcastReceiver<int[]>(MpiTestConstants.BroadcastOperatorName); + _triangleNumberSender = _commGroup.GetReduceSender<int[]>(MpiTestConstants.ReduceOperatorName); + } + + public byte[] Call(byte[] memento) + { + for (int i = 0; i < _numIterations; i++) + { + // Receive n from Master Task + int[] intVec = _broadcastReceiver.Receive(); + + Logger.Log(Level.Info, "Calculating TriangleNumber({0}) on slave task...", intVec[0]); + + // Calculate the nth Triangle number and send it back to driver + int triangleNum = TriangleNumber(intVec[0]); + Logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", triangleNum, i); + + int[] resArr = new int[intVec.Length]; + + for (int j = 0; j < resArr.Length; j++) + { + resArr[j] = triangleNum; + } + + _triangleNumberSender.Send(resArr); + } + + return null; + } + + public void Dispose() + { + _mpiClient.Dispose(); + } + + private int TriangleNumber(int n) + { + return Enumerable.Range(1, n).Sum(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index e559ce3..a915f59 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -64,6 +64,10 @@ under the License. <Compile Include="Functional\MPI\BroadcastReduceTest\SlaveTask.cs" /> <Compile Include="Functional\MPI\MpiTestConfig.cs" /> <Compile Include="Functional\MPI\MpiTestConstants.cs" /> + <Compile Include="Functional\MPI\PipelinedBroadcastReduceTest\PipelinedBroadcastReduceDriver.cs" /> + <Compile Include="Functional\MPI\PipelinedBroadcastReduceTest\PipelinedBroadcastReduceTest.cs" /> + <Compile Include="Functional\MPI\PipelinedBroadcastReduceTest\PipelinedMasterTask.cs" /> + <Compile Include="Functional\MPI\PipelinedBroadcastReduceTest\PipelinedSlaveTask.cs" /> <Compile Include="Functional\MPI\ScatterReduceTest\MasterTask.cs" /> <Compile Include="Functional\MPI\ScatterReduceTest\ScatterReduceDriver.cs" /> <Compile Include="Functional\MPI\ScatterReduceTest\ScatterReduceTest.cs" /> @@ -78,11 +82,11 @@ under the License. <Compile Include="Utility\TestDriverConfigGenerator.cs" /> <Compile Include="Utility\TestExceptions.cs" /> </ItemGroup> - <ItemGroup> + <ItemGroup> <None Include="run.cmd"> <CopyToOutputDirectory>Always</CopyToOutputDirectory> </None> - <None Include="ConfigFiles\evaluator.conf"> + <None Include="ConfigFiles\evaluator.conf"> <CopyToOutputDirectory>Always</CopyToOutputDirectory> </None> <None Include="packages.config" /> @@ -124,7 +128,7 @@ under the License. <Project>{1b983182-9c30-464c-948d-f87eb93a8240}</Project> <Name>Org.Apache.REEF.Evaluator</Name> </ProjectReference> - <ProjectReference Include="..\Org.Apache.REEF.Bridge\Org.Apache.REEF.Bridge.vcxproj"> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Bridge\Org.Apache.REEF.Bridge.vcxproj"> <Project>{4e69d40a-26d6-4d4a-b96d-729946c07fe1}</Project> <Name>Org.Apache.REEF.Bridge</Name> </ProjectReference>
