Repository: incubator-reef Updated Branches: refs/heads/master 7edb8570e -> 0292caf14
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs new file mode 100644 index 0000000..54a46a1 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TaskNode.cs @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Util; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Network.Group.Topology +{ + /// <summary> + /// Represents a node in the operator topology graph. + /// </summary> + internal class TaskNode + { + private string _groupName; + private string _operatorName; + private string _taskId; + private string _driverId; + + private TaskNode _parent; + private List<TaskNode> _children; + + public TaskNode( + string groupName, + string operatorName, + string taskId, + string driverId) + { + _groupName = groupName; + _operatorName = operatorName; + _taskId = taskId; + _driverId = driverId; + + _children = new List<TaskNode>(); + } + + public string TaskId { get; private set; } + + public void AddChild(TaskNode child) + { + _children.Add(child); + } + + public void SetParent(TaskNode parent) + { + _parent = parent; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj index b759c24..fb6e183 100644 --- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj +++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj @@ -95,6 +95,49 @@ under the License. <Reference Include="System.Xml" /> </ItemGroup> <ItemGroup> + <Compile Include="Group\Codec\GcmMessageProto.cs" /> + <Compile Include="Group\Codec\GroupCommunicationMessageCodec.cs" /> + <Compile Include="Group\Config\MpiConfigurationOptions.cs" /> + <Compile Include="Group\Driver\ICommunicationGroupDriver.cs" /> + <Compile Include="Group\Driver\IMpiDriver.cs" /> + <Compile Include="Group\Driver\Impl\CommunicationGroupDriver.cs" /> + <Compile Include="Group\Driver\Impl\GroupCommunicationMessage.cs" /> + <Compile Include="Group\Driver\Impl\MessageType.cs" /> + <Compile Include="Group\Driver\Impl\MpiDriver.cs" /> + <Compile Include="Group\Driver\Impl\TaskStarter.cs" /> + <Compile Include="Group\Operators\IBroadcastReceiver.cs" /> + <Compile Include="Group\Operators\IBroadcastSender.cs" /> + <Compile Include="Group\Operators\IMpiOperator.cs" /> + <Compile Include="Group\Operators\Impl\BroadcastOperatorSpec.cs" /> + <Compile Include="Group\Operators\Impl\BroadcastReceiver.cs" /> + <Compile Include="Group\Operators\Impl\BroadcastSender.cs" /> + <Compile Include="Group\Operators\Impl\ReduceFunction.cs" /> + <Compile Include="Group\Operators\Impl\ReduceOperatorSpec.cs" /> + <Compile Include="Group\Operators\Impl\ReduceReceiver.cs" /> + <Compile Include="Group\Operators\Impl\ReduceSender.cs" /> + <Compile Include="Group\Operators\Impl\ScatterOperatorSpec.cs" /> + <Compile Include="Group\Operators\Impl\ScatterReceiver.cs" /> + <Compile Include="Group\Operators\Impl\ScatterSender.cs" /> + <Compile Include="Group\Operators\Impl\Sender.cs" /> + <Compile Include="Group\Operators\IOperatorSpec.cs" /> + <Compile Include="Group\Operators\IReduceFunction.cs" /> + <Compile Include="Group\Operators\IReduceReceiver.cs" /> + <Compile Include="Group\Operators\IReduceSender.cs" /> + <Compile Include="Group\Operators\IScatterReceiver.cs" /> + <Compile Include="Group\Operators\IScatterSender.cs" /> + <Compile Include="Group\Task\ICommunicationGroupClient.cs" /> + <Compile Include="Group\Task\ICommunicationGroupNetworkObserver.cs" /> + <Compile Include="Group\Task\IMpiClient.cs" /> + <Compile Include="Group\Task\IMpiNetworkObserver.cs" /> + <Compile Include="Group\Task\Impl\CommunicationGroupClient.cs" /> + <Compile Include="Group\Task\Impl\CommunicationGroupNetworkObserver.cs" /> + <Compile Include="Group\Task\Impl\MpiClient.cs" /> + <Compile Include="Group\Task\Impl\MpiNetworkObserver.cs" /> + <Compile Include="Group\Task\Impl\NodeStruct.cs" /> + <Compile Include="Group\Task\Impl\OperatorTopology.cs" /> + <Compile Include="Group\Topology\FlatTopology.cs" /> + <Compile Include="Group\Topology\ITopology.cs" /> + <Compile Include="Group\Topology\TaskNode.cs" /> <Compile Include="Naming\Codec\NamingLookupRequestCodec.cs" /> <Compile Include="Naming\Codec\NamingLookupResponseCodec.cs" /> <Compile Include="Naming\Codec\NamingRegisterRequestCodec.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs new file mode 100644 index 0000000..8d00e69 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Examples.MachineLearning.KMeans; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.ML.KMeans +{ + [TestClass] + public class TestKMeans : ReefFunctionalTest + { + private const int K = 3; + private const int Partitions = 2; + private const string SmallMouseDataFile = @"mouseData_small.csv"; + private const string MouseDataFile = @"mouseData.csv"; + + private bool _useSmallDataSet = false; + private string _dataFile = MouseDataFile; + + [TestInitialize()] + public void TestSetup() + { + if (_useSmallDataSet) + { + _dataFile = SmallMouseDataFile; + } + + CleanUp(); + Init(); + } + + [TestCleanup] + public void TestCleanup() + { + Console.WriteLine("Post test check and clean up"); + CleanUp(); + } + + [TestMethod, Priority(1), TestCategory("FunctionalGated")] + [Description("Test KMeans clustering with things directly run without reef")] + [DeploymentItem(@".")] + [DeploymentItem(@"Data", ".")] + [Ignore] + [Timeout(180 * 1000)] + public void TestKMeansOnDirectRunViaFileSystem() + { + int iteration = 0; + string executionDirectory = Path.Combine(Directory.GetCurrentDirectory(), Constants.KMeansExecutionBaseDirectory, Guid.NewGuid().ToString("N").Substring(0, 4)); + List<DataVector> centroids = DataVector.ShuffleDataAndGetInitialCentriods(_dataFile, Partitions, K, executionDirectory); + + // initialize all tasks + List<LegacyKMeansTask> tasks = new List<LegacyKMeansTask>(); + List<DataVector> labeledData = new List<DataVector>(); + for (int i = 0; i < Partitions; i++) + { + DataPartitionCache p = new DataPartitionCache(i, executionDirectory); + tasks.Add(new LegacyKMeansTask(p, K, executionDirectory)); + labeledData.AddRange(p.DataVectors); + } + + float loss = float.MaxValue; + while (true) + { + for (int i = 0; i < Partitions; i++) + { + tasks[i].CallWithWritingToFileSystem(null); + } + List<DataVector> newCentroids = PartialMean.AggregateTrueMeansToFileSystem(Partitions, K, executionDirectory); + DataVector.WriteToCentroidFile(newCentroids, executionDirectory); + centroids = newCentroids; + float newLoss = LegacyKMeansTask.ComputeLossFunction(centroids, labeledData); + if (newLoss > loss) + { + throw new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "The new loss {0} is larger than previous loss {1}, while loss function must be monotonically decreasing across iterations", newLoss, loss)); + } + else if (newLoss.Equals(loss)) + { + Console.WriteLine(string.Format(CultureInfo.InvariantCulture, "KMeans clustering has converged with a loss value of {0} at iteration {1} ", newLoss, iteration)); + break; + } + else + { + loss = newLoss; + } + iteration++; + } + } + + [TestMethod, Priority(1), TestCategory("FunctionalGated")] + [Description("Test KMeans clustering on reef local runtime with group communications")] + [DeploymentItem(@".")] + [DeploymentItem(@"Data", ".")] + [Ignore] + [Timeout(180 * 1000)] + public void TestKMeansOnLocalRuntimeWithGroupCommunications() + { + IsOnLocalRuntiime = true; + TestRun(AssembliesToCopy(), DriverConfiguration()); + ValidateSuccessForLocalRuntime(Partitions + 1); + } + + [TestMethod, Priority(1), TestCategory("FunctionalGated")] + [Description("Test KMeans clustering on reef YARN runtime - one box")] + [DeploymentItem(@".")] + [DeploymentItem(@"Data", ".")] + [Timeout(180 * 1000)] + [Ignore] // ignored by default + public void TestKMeansOnYarnOneBoxWithGroupCommunications() + { + TestRun(AssembliesToCopy(), DriverConfiguration(), runOnYarn: true); + Assert.IsNotNull("BreakPointChecker"); + } + + private IConfiguration DriverConfiguration() + { + return DriverBridgeConfiguration.ConfigurationModule + .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<KMeansDriverHandlers>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<KMeansDriverHandlers>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<KMeansDriverHandlers>.Class) + .Set(DriverBridgeConfiguration.OnContextActive, GenericType<KMeansDriverHandlers>.Class) + .Set(DriverBridgeConfiguration.CommandLineArguments, "DataFile:" + _dataFile) + .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Build(); + } + + private HashSet<string> AssembliesToCopy() + { + HashSet<string> appDlls = new HashSet<string>(); + appDlls.Add(typeof(KMeansDriverHandlers).Assembly.GetName().Name); + appDlls.Add(typeof(LegacyKMeansTask).Assembly.GetName().Name); + appDlls.Add(typeof(INameClient).Assembly.GetName().Name); + appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + return appDlls; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs new file mode 100644 index 0000000..7472d56 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceDriver.cs @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Operators.Impl; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest +{ + public class BroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(BroadcastReduceDriver)); + + private int _numEvaluators; + private int _numIterations; + + private IMpiDriver _mpiDriver; + private ICommunicationGroupDriver _commGroup; + private TaskStarter _mpiTaskStarter; + + [Inject] + public BroadcastReduceDriver( + [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators, + [Parameter(typeof(MpiTestConfig.NumIterations))] int numIterations, + AvroConfigurationSerializer confSerializer) + { + Identifier = "BroadcastStartHandler"; + _numEvaluators = numEvaluators; + _numIterations = numIterations; + + _mpiDriver = new MpiDriver( + MpiTestConstants.DriverId, + MpiTestConstants.MasterTaskId, + confSerializer); + + _commGroup = _mpiDriver.NewCommunicationGroup( + MpiTestConstants.GroupName, + numEvaluators) + .AddBroadcast( + MpiTestConstants.BroadcastOperatorName, + new BroadcastOperatorSpec<int>( + MpiTestConstants.MasterTaskId, + new IntCodec())) + .AddReduce( + MpiTestConstants.ReduceOperatorName, + new ReduceOperatorSpec<int>( + MpiTestConstants.MasterTaskId, + new IntCodec(), + new SumFunction())) + .Build(); + + _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators); + + CreateClassHierarchy(); + } + + public string Identifier { get; set; } + + public void OnNext(IEvaluatorRequestor evaluatorRequestor) + { + EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator"); + evaluatorRequestor.Submit(request); + } + + public void OnNext(IAllocatedEvaluator allocatedEvaluator) + { + IConfiguration contextConf = _mpiDriver.GetContextConfiguration(); + IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration(); + allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); + } + + public void OnNext(IActiveContext activeContext) + { + if (_mpiDriver.IsMasterTaskContext(activeContext)) + { + // Configure Master Task + IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder( + TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, MpiTestConstants.MasterTaskId) + .Set(TaskConfiguration.Task, GenericType<MasterTask>.Class) + .Build()) + .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( + GenericType<MpiTestConfig.NumEvaluators>.Class, + _numEvaluators.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<MpiTestConfig.NumIterations, int>( + GenericType<MpiTestConfig.NumIterations>.Class, + _numIterations.ToString(CultureInfo.InvariantCulture)) + .Build(); + + _commGroup.AddTask(MpiTestConstants.MasterTaskId); + _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + } + else + { + // Configure Slave Task + string slaveTaskId = "SlaveTask-" + activeContext.Id; + IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder( + TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, slaveTaskId) + .Set(TaskConfiguration.Task, GenericType<SlaveTask>.Class) + .Build()) + .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( + GenericType<MpiTestConfig.NumEvaluators>.Class, + _numEvaluators.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<MpiTestConfig.NumIterations, int>( + GenericType<MpiTestConfig.NumIterations>.Class, + _numIterations.ToString(CultureInfo.InvariantCulture)) + .Build(); + + _commGroup.AddTask(slaveTaskId); + _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + } + } + + public void OnNext(IFailedEvaluator value) + { + } + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + + private void CreateClassHierarchy() + { + HashSet<string> clrDlls = new HashSet<string>(); + clrDlls.Add(typeof(IDriver).Assembly.GetName().Name); + clrDlls.Add(typeof(ITask).Assembly.GetName().Name); + clrDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name); + clrDlls.Add(typeof(INameClient).Assembly.GetName().Name); + clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + ClrHandlerHelper.GenerateClassHierarchy(clrDlls); + } + + private class SumFunction : IReduceFunction<int> + { + [Inject] + public SumFunction() + { + } + + public int Reduce(IEnumerable<int> elements) + { + return elements.Sum(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs new file mode 100644 index 0000000..c74c923 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/BroadcastReduceTest.cs @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Globalization; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest +{ + [TestClass] + public class BroadcastReduceTest : ReefFunctionalTest + { + [TestInitialize] + public void TestSetup() + { + CleanUp(); + } + + [TestCleanup] + public void TestCleanup() + { + CleanUp(); + } + + [TestMethod] + public void TestBroadcastAndReduce() + { + int numTasks = 4; + + IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( + DriverBridgeConfiguration.ConfigurationModule + .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnContextActive, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Build()) + .BindNamedParameter<MpiTestConfig.NumIterations, int>( + GenericType<MpiTestConfig.NumIterations>.Class, + MpiTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( + GenericType<MpiTestConfig.NumEvaluators>.Class, + numTasks.ToString(CultureInfo.InvariantCulture)) + .Build(); + + HashSet<string> appDlls = new HashSet<string>(); + appDlls.Add(typeof(IDriver).Assembly.GetName().Name); + appDlls.Add(typeof(ITask).Assembly.GetName().Name); + appDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name); + appDlls.Add(typeof(INameClient).Assembly.GetName().Name); + appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + TestRun(appDlls, driverConfig, false, JavaLoggingSetting.VERBOSE); + ValidateSuccessForLocalRuntime(numTasks); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs new file mode 100644 index 0000000..78f8840 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/MasterTask.cs @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest +{ + public class MasterTask : ITask + { + private static Logger _logger = Logger.GetLogger(typeof(MasterTask)); + + private int _numIters; + private int _numReduceSenders; + + private IMpiClient _mpiClient; + private ICommunicationGroupClient _commGroup; + private IBroadcastSender<int> _broadcastSender; + private IReduceReceiver<int> _sumReducer; + + [Inject] + public MasterTask( + [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters, + [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators, + IMpiClient mpiClient) + { + _logger.Log(Level.Info, "Hello from master task"); + _numIters = numIters; + _numReduceSenders = numEvaluators - 1; + _mpiClient = mpiClient; + + _commGroup = mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName); + _broadcastSender = _commGroup.GetBroadcastSender<int>(MpiTestConstants.BroadcastOperatorName); + _sumReducer = _commGroup.GetReduceReceiver<int>(MpiTestConstants.ReduceOperatorName); + } + + public byte[] Call(byte[] memento) + { + for (int i = 1; i <= _numIters; i++) + { + // Each slave task calculates the nth triangle number + _broadcastSender.Send(i); + + // Sum up all of the calculated triangle numbers + int sum = _sumReducer.Reduce(); + _logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i); + + int expected = TriangleNumber(i) * _numReduceSenders; + if (sum != TriangleNumber(i) * _numReduceSenders) + { + throw new Exception("Expected " + expected + " but got " + sum); + } + } + + return null; + } + + public void Dispose() + { + _mpiClient.Dispose(); + } + + private int TriangleNumber(int n) + { + return Enumerable.Range(1, n).Sum(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs new file mode 100644 index 0000000..20b9351 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/BroadcastReduceTest/SlaveTask.cs @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.MPI.BroadcastReduceTest +{ + public class SlaveTask : ITask + { + private static Logger _logger = Logger.GetLogger(typeof(SlaveTask)); + + private int _numIterations; + private IMpiClient _mpiClient; + private ICommunicationGroupClient _commGroup; + private IBroadcastReceiver<int> _broadcastReceiver; + private IReduceSender<int> _triangleNumberSender; + + [Inject] + public SlaveTask( + [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters, + IMpiClient mpiClient) + { + _logger.Log(Level.Info, "Hello from slave task"); + + _numIterations = numIters; + _mpiClient = mpiClient; + _commGroup = _mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName); + _broadcastReceiver = _commGroup.GetBroadcastReceiver<int>(MpiTestConstants.BroadcastOperatorName); + _triangleNumberSender = _commGroup.GetReduceSender<int>(MpiTestConstants.ReduceOperatorName); + } + + public byte[] Call(byte[] memento) + { + for (int i = 0; i < _numIterations; i++) + { + // Receive n from Master Task + int n = _broadcastReceiver.Receive(); + _logger.Log(Level.Info, "Calculating TriangleNumber({0}) on slave task...", n); + + // Calculate the nth Triangle number and send it back to driver + int triangleNum = TriangleNumber(n); + _logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", triangleNum, i); + _triangleNumberSender.Send(triangleNum); + } + + return null; + } + + public void Dispose() + { + _mpiClient.Dispose(); + } + + private int TriangleNumber(int n) + { + return Enumerable.Range(1, n).Sum(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs new file mode 100644 index 0000000..9450e20 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConfig.cs @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Tests.Functional.MPI +{ + public class MpiTestConfig + { + [NamedParameter("Number of iterations of messages to send")] + public class NumIterations : Name<int> + { + } + + [NamedParameter("Number of Evaluators")] + public class NumEvaluators : Name<int> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs new file mode 100644 index 0000000..5cff899 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/MpiTestConstants.cs @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Tests.Functional.MPI +{ + internal class MpiTestConstants + { + public const string DriverId = "BroadcastReduceDriver"; + public const string GroupName = "BroadcastReduceGroup"; + public const string BroadcastOperatorName = "Broadcast"; + public const string ReduceOperatorName = "Reduce"; + public const string ScatterOperatorName = "Scatter"; + public const string MasterTaskId = "MasterTask"; + public const string SlaveTaskId = "SlaveTask-"; + public const int NumIterations = 10; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs new file mode 100644 index 0000000..07168c2 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/MasterTask.cs @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest +{ + public class MasterTask : ITask + { + private static Logger _logger = Logger.GetLogger(typeof(MasterTask)); + + private IMpiClient _mpiClient; + private ICommunicationGroupClient _commGroup; + private IScatterSender<int> _scatterSender; + private IReduceReceiver<int> _sumReducer; + + [Inject] + public MasterTask(IMpiClient mpiClient) + { + _logger.Log(Level.Info, "Hello from master task"); + _mpiClient = mpiClient; + + _commGroup = mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName); + _scatterSender = _commGroup.GetScatterSender<int>(MpiTestConstants.ScatterOperatorName); + _sumReducer = _commGroup.GetReduceReceiver<int>(MpiTestConstants.ReduceOperatorName); + } + + public byte[] Call(byte[] memento) + { + List<int> data = Enumerable.Range(1, 100).ToList(); + List<string> order = GetScatterOrder(); + _scatterSender.Send(data, order); + + int sum = _sumReducer.Reduce(); + _logger.Log(Level.Info, "Received sum: {0}", sum); + + return null; + } + + public void Dispose() + { + _mpiClient.Dispose(); + } + + private List<string> GetScatterOrder() + { + return new List<string> { "SlaveTask-4", "SlaveTask-3", "SlaveTask-2", "SlaveTask-1" }; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs new file mode 100644 index 0000000..18ff22b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceDriver.cs @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Operators.Impl; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest +{ + public class ScatterReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ScatterReduceDriver)); + + private int _numEvaluators; + + private IMpiDriver _mpiDriver; + private ICommunicationGroupDriver _commGroup; + private TaskStarter _mpiTaskStarter; + + [Inject] + public ScatterReduceDriver( + [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators, + AvroConfigurationSerializer confSerializer) + { + Identifier = "BroadcastStartHandler"; + _numEvaluators = numEvaluators; + + _mpiDriver = new MpiDriver( + MpiTestConstants.DriverId, + MpiTestConstants.MasterTaskId, + confSerializer); + + _commGroup = _mpiDriver.NewCommunicationGroup( + MpiTestConstants.GroupName, + numEvaluators) + .AddScatter( + MpiTestConstants.ScatterOperatorName, + new ScatterOperatorSpec<int>( + MpiTestConstants.MasterTaskId, + new IntCodec())) + .AddReduce( + MpiTestConstants.ReduceOperatorName, + new ReduceOperatorSpec<int>( + MpiTestConstants.MasterTaskId, + new IntCodec(), + new SumFunction())) + .Build(); + + _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators); + + CreateClassHierarchy(); + } + + public string Identifier { get; set; } + + public void OnNext(IEvaluatorRequestor evaluatorRequestor) + { + EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator"); + evaluatorRequestor.Submit(request); + } + + public void OnNext(IAllocatedEvaluator allocatedEvaluator) + { + IConfiguration contextConf = _mpiDriver.GetContextConfiguration(); + IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration(); + allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); + } + + public void OnNext(IActiveContext activeContext) + { + if (_mpiDriver.IsMasterTaskContext(activeContext)) + { + // Configure Master Task + IConfiguration partialTaskConf = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, MpiTestConstants.MasterTaskId) + .Set(TaskConfiguration.Task, GenericType<MasterTask>.Class) + .Build(); + + _commGroup.AddTask(MpiTestConstants.MasterTaskId); + _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + } + else + { + // Configure Slave Task + string slaveTaskId = MpiTestConstants.SlaveTaskId + + _mpiDriver.GetContextNum(activeContext); + + IConfiguration partialTaskConf = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, slaveTaskId) + .Set(TaskConfiguration.Task, GenericType<SlaveTask>.Class) + .Build(); + + _commGroup.AddTask(slaveTaskId); + _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + } + } + + public void OnNext(IFailedEvaluator value) + { + } + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + + private void CreateClassHierarchy() + { + HashSet<string> clrDlls = new HashSet<string>(); + clrDlls.Add(typeof(IDriver).Assembly.GetName().Name); + clrDlls.Add(typeof(ITask).Assembly.GetName().Name); + clrDlls.Add(typeof(ScatterReduceDriver).Assembly.GetName().Name); + clrDlls.Add(typeof(INameClient).Assembly.GetName().Name); + clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + ClrHandlerHelper.GenerateClassHierarchy(clrDlls); + } + + private class SumFunction : IReduceFunction<int> + { + [Inject] + public SumFunction() + { + } + + public int Reduce(IEnumerable<int> elements) + { + return elements.Sum(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs new file mode 100644 index 0000000..1a0eef5 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/ScatterReduceTest.cs @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Globalization; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest +{ + [TestClass] + public class ScatterReduceTest : ReefFunctionalTest + { + [TestInitialize] + public void TestSetup() + { + CleanUp(); + } + + [TestCleanup] + public void TestCleanup() + { + CleanUp(); + } + + [TestMethod] + public void TestScatterAndReduce() + { + int numTasks = 5; + + IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( + DriverBridgeConfiguration.ConfigurationModule + .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<ScatterReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<ScatterReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<ScatterReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<ScatterReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnContextActive, GenericType<ScatterReduceDriver>.Class) + .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Build()) + .BindNamedParameter<MpiTestConfig.NumEvaluators, int>( + GenericType<MpiTestConfig.NumEvaluators>.Class, + numTasks.ToString(CultureInfo.InvariantCulture)) + .Build(); + + HashSet<string> appDlls = new HashSet<string>(); + appDlls.Add(typeof(IDriver).Assembly.GetName().Name); + appDlls.Add(typeof(ITask).Assembly.GetName().Name); + appDlls.Add(typeof(ScatterReduceDriver).Assembly.GetName().Name); + appDlls.Add(typeof(INameClient).Assembly.GetName().Name); + appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + TestRun(appDlls, driverConfig, false, JavaLoggingSetting.VERBOSE); + ValidateSuccessForLocalRuntime(numTasks); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/SlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/SlaveTask.cs new file mode 100644 index 0000000..327b991 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/ScatterReduceTest/SlaveTask.cs @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.MPI.ScatterReduceTest +{ + public class SlaveTask : ITask + { + private static Logger _logger = Logger.GetLogger(typeof(SlaveTask)); + + private IMpiClient _mpiClient; + private ICommunicationGroupClient _commGroup; + private IScatterReceiver<int> _scatterReceiver; + private IReduceSender<int> _sumSender; + + [Inject] + public SlaveTask(IMpiClient mpiClient) + { + _logger.Log(Level.Info, "Hello from slave task"); + + _mpiClient = mpiClient; + _commGroup = _mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName); + _scatterReceiver = _commGroup.GetScatterReceiver<int>(MpiTestConstants.ScatterOperatorName); + _sumSender = _commGroup.GetReduceSender<int>(MpiTestConstants.ReduceOperatorName); + } + + public byte[] Call(byte[] memento) + { + List<int> data = _scatterReceiver.Receive(); + _logger.Log(Level.Info, "Received data: {0}", string.Join(" ", data)); + + int sum = data.Sum(); + _logger.Log(Level.Info, "Sending back sum: {0}", sum); + _sumSender.Send(sum); + + return null; + } + + public void Dispose() + { + _mpiClient.Dispose(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/0292caf1/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index f8b4385..45c39c5 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -126,6 +126,17 @@ under the License. <Compile Include="Functional\Messaging\MessageDriver.cs" /> <Compile Include="Functional\Messaging\MessageTask.cs" /> <Compile Include="Functional\Messaging\TestTaskMessage.cs" /> + <Compile Include="Functional\ML\KMeans\TestKMeans.cs" /> + <Compile Include="Functional\MPI\BroadcastReduceTest\BroadcastReduceDriver.cs" /> + <Compile Include="Functional\MPI\BroadcastReduceTest\BroadcastReduceTest.cs" /> + <Compile Include="Functional\MPI\BroadcastReduceTest\MasterTask.cs" /> + <Compile Include="Functional\MPI\BroadcastReduceTest\SlaveTask.cs" /> + <Compile Include="Functional\MPI\MpiTestConfig.cs" /> + <Compile Include="Functional\MPI\MpiTestConstants.cs" /> + <Compile Include="Functional\MPI\ScatterReduceTest\MasterTask.cs" /> + <Compile Include="Functional\MPI\ScatterReduceTest\ScatterReduceDriver.cs" /> + <Compile Include="Functional\MPI\ScatterReduceTest\ScatterReduceTest.cs" /> + <Compile Include="Functional\MPI\ScatterReduceTest\SlaveTask.cs" /> <Compile Include="Functional\ReefFunctionalTest.cs" /> <Compile Include="Network\BlockingCollectionExtensionTests.cs" /> <Compile Include="Network\NameServerTests.cs" />
