[REEF-249] Adding Network.Examples for Group Communication This PR is to add a sampel client for Group Communication so that we can run it on Yarn env.
- It refactored GroupComunication tests, moved drivers and tasks into Network.Examples so that the code can be shared by bath test and client - It also added Network.Example.Client as a console app. It contains driver configuration for group communication samples. The app can run on Yarn without any VS and test framework dependency. - Original folder structure of group communication tests are simplified and MPI is renamed as Group. - Client project is updated to reference jar file and dlls from its own binary folder to be consistent with other client projects. JIRA: REEF-249. (https://issues.apache.org/jira/browse/REEF-249) This closes #143 Author: Julia Wang Email: [email protected] Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c85d45a2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c85d45a2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c85d45a2 Branch: refs/heads/master Commit: c85d45a27ef897af1b8214471333d9b17fd43134 Parents: 3686317 Author: Julia Wang <[email protected]> Authored: Mon Apr 13 17:17:21 2015 -0700 Committer: Beysim Sezgin <[email protected]> Committed: Tue Apr 14 11:00:38 2015 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Client/CLRBridgeClient.cs | 8 +- .../Org.Apache.REEF.Client.csproj | 15 +- lang/cs/Org.Apache.REEF.Client/run.cmd | 2 +- .../Bridge/ClrHandlerHelper.cs | 2 +- .../BroadcastAndReduceClient.cs | 85 +++++ ...g.Apache.REEF.Network.Examples.Client.csproj | 116 +++++++ .../PipelineBroadcastAndReduceClient.cs | 86 +++++ .../Properties/AssemblyInfo.cs | 55 ++++ .../Run.cs | 58 ++++ .../run.cmd | 45 +++ .../BroadcastReduceDriver.cs | 174 ++++++++++ .../BroadcastReduceDriverAndTasks/MasterTask.cs | 89 ++++++ .../BroadcastReduceDriverAndTasks/SlaveTask.cs | 80 +++++ .../GroupCommunication/GroupTestConfig.cs | 56 ++++ .../GroupCommunication/GroupTestConstants.cs | 36 +++ .../PipelinedBroadcastReduceDriver.cs | 320 +++++++++++++++++++ .../PipelinedMasterTask.cs | 102 ++++++ .../PipelinedSlaveTask.cs | 89 ++++++ .../ScatterReduceDriverAndTasks/MasterTask.cs | 71 ++++ .../ScatterReduceDriver.cs | 157 +++++++++ .../ScatterReduceDriverAndTasks/SlaveTask.cs | 67 ++++ .../Org.Apache.REEF.Network.Examples.csproj | 96 ++++++ .../Properties/AssemblyInfo.cs | 53 +++ .../ConfigFiles/evaluator.conf | Bin 2837 -> 0 bytes .../Functional/Group/BroadcastReduceTest.cs | 110 +++++++ .../Group/PipelinedBroadcastReduceTest.cs | 112 +++++++ .../Functional/Group/ScatterReduceTest.cs | 107 +++++++ .../BroadcastReduceDriver.cs | 177 ---------- .../BroadcastReduceTest/BroadcastReduceTest.cs | 109 ------- .../MPI/BroadcastReduceTest/MasterTask.cs | 89 ------ .../MPI/BroadcastReduceTest/SlaveTask.cs | 80 ----- .../Functional/MPI/MpiTestConfig.cs | 56 ---- .../Functional/MPI/MpiTestConstants.cs | 36 --- .../PipelinedBroadcastReduceDriver.cs | 320 ------------------- .../PipelinedBroadcastReduceTest.cs | 110 ------- .../PipelinedMasterTask.cs | 102 ------ .../PipelinedSlaveTask.cs | 89 ------ .../MPI/ScatterReduceTest/MasterTask.cs | 71 ---- .../ScatterReduceTest/ScatterReduceDriver.cs | 162 ---------- .../MPI/ScatterReduceTest/ScatterReduceTest.cs | 104 ------ .../MPI/ScatterReduceTest/SlaveTask.cs | 67 ---- .../Org.Apache.REEF.Tests.csproj | 24 +- lang/cs/Org.Apache.REEF.Tests/run.cmd | 2 +- lang/cs/Org.Apache.REEF.sln | Bin 42590 -> 24040 bytes 44 files changed, 2192 insertions(+), 1597 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Client/CLRBridgeClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/CLRBridgeClient.cs b/lang/cs/Org.Apache.REEF.Client/CLRBridgeClient.cs index 5ec77c6..f6d8f82 100644 --- a/lang/cs/Org.Apache.REEF.Client/CLRBridgeClient.cs +++ b/lang/cs/Org.Apache.REEF.Client/CLRBridgeClient.cs @@ -36,8 +36,8 @@ namespace Org.Apache.REEF.Client public class CLRBridgeClient { public const string ReefHome = "REEF_HOME"; - public const string DefaultClrFolder = @"lang\java\reef-bridge-java\dotnetHello"; - public const string DefaultReefJar = @"lang\java\reef-bridge-java\target\" + Constants.JavaBridgeJarFileName; + public const string DefaultClrFolder = "."; + public const string DefaultReefJar = Constants.JavaBridgeJarFileName; public const string DefaultRunCommand = "run.cmd"; private static string _clrFolder = null; @@ -116,12 +116,12 @@ namespace Org.Apache.REEF.Client if (string.IsNullOrWhiteSpace(_reefJar)) { - _reefJar = Path.Combine(Environment.GetEnvironmentVariable(ReefHome), DefaultReefJar); + _reefJar = DefaultReefJar; } if (string.IsNullOrWhiteSpace(_clrFolder)) { - _clrFolder = Path.Combine(Environment.GetEnvironmentVariable(ReefHome), DefaultClrFolder); + _clrFolder = DefaultClrFolder; } // Configurable driver submission settings: http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj index b7173c3..69c939a 100644 --- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj +++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj @@ -89,10 +89,14 @@ under the License. <Project>{75503f90-7b82-4762-9997-94b5c68f15db}</Project> <Name>Org.Apache.REEF.Examples</Name> </ProjectReference> - <ProjectReference Include="..\Org.Apache.REEF.Bridge\Org.Apache.REEF.Bridge.vcxproj"> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Bridge\Org.Apache.REEF.Bridge.vcxproj"> <Project>{4e69d40a-26d6-4d4a-b96d-729946c07fe1}</Project> <Name>Org.Apache.REEF.Bridge</Name> </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Evaluator\Org.Apache.REEF.Evaluator.csproj"> + <Project>{1b983182-9c30-464c-948d-f87eb93a8240}</Project> + <Name>Org.Apache.REEF.Evaluator</Name> + </ProjectReference> </ItemGroup> <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" /> @@ -102,6 +106,15 @@ under the License. </PropertyGroup> <Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" /> </Target> + <!--begin jar reference--> + <PropertyGroup> + <AfterBuildDependsOn> + $(AfterBuildDependsOn); + CopyJarFiles; + </AfterBuildDependsOn> + </PropertyGroup> + <Target Name="AfterBuild" DependsOnTargets="$(AfterBuildDependsOn);" /> + <!--end jar reference--> <!-- To modify your build process, add your task inside one of the targets below and uncomment it. Other similar extension points exist, see Microsoft.Common.targets. <Target Name="BeforeBuild"> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Client/run.cmd ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Client/run.cmd b/lang/cs/Org.Apache.REEF.Client/run.cmd index 3b64520..ae69a5f 100644 --- a/lang/cs/Org.Apache.REEF.Client/run.cmd +++ b/lang/cs/Org.Apache.REEF.Client/run.cmd @@ -33,7 +33,7 @@ :: RUNTIME -set SHADED_JAR=%REEF_HOME%\lang\reef-bridge\target\reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar +set SHADED_JAR=%REEF_HOME%\lang\reef-bridge\target\reef-bridge-java-0.11.0-incubating-SNAPSHOT-shaded.jar set LOGGING_CONFIG=-Djava.util.logging.config.class=org.apache.reef.util.logging.CLRLoggingConfig http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs index bce5ce4..6ef87bb 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs @@ -40,7 +40,7 @@ namespace Org.Apache.REEF.Driver.Bridge { get { - return new[] { "Microsoft.Hadoop.Avro.dll", "Org.Apache.REEF.Driver.dll", "Org.Apache.REEF.Common.dll", "Org.Apache.REEF.Utilities.dll", "Org.Apache.REEF.Network.dll", "Org.Apache.REEF.Tang.dll", "Org.Apache.REEF.Wake.dll", "Newtonsoft.Json.dll", "protobuf-net.dll" }; + return new[] { "Microsoft.Hadoop.Avro.dll", "Org.Apache.REEF.Driver.dll", "Org.Apache.REEF.Common.dll", "Org.Apache.REEF.Utilities.dll", "Org.Apache.REEF.Network.dll", "Org.Apache.REEF.Tang.dll", "Org.Apache.REEF.Wake.dll", "Org.Apache.REEF.Bridge.dll", "Newtonsoft.Json.dll", "protobuf-net.dll" }; } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs new file mode 100644 index 0000000..0db6ce0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Globalization; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Network.Examples.GroupCommunication; +using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks; +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Network.Examples.Client +{ + class BroadcastAndReduceClient + { + public void RunBroadcastAndReduce(bool runOnYarn, int numTasks) + { + const int numIterations = 10; + const string driverId = "BroadcastReduceDriver"; + const string groupName = "BroadcastReduceGroup"; + const string masterTaskId = "MasterTask"; + const int fanOut = 2; + + IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( + DriverBridgeConfiguration.ConfigurationModule + .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnContextActive, GenericType<BroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Build()) + .BindNamedParameter<GroupTestConfig.NumIterations, int>( + GenericType<GroupTestConfig.NumIterations>.Class, + numIterations.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<GroupTestConfig.NumEvaluators, int>( + GenericType<GroupTestConfig.NumEvaluators>.Class, + numTasks.ToString(CultureInfo.InvariantCulture)) + .Build(); + + IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindStringNamedParam<MpiConfigurationOptions.DriverId>(driverId) + .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(masterTaskId) + .BindStringNamedParam<MpiConfigurationOptions.GroupName>(groupName) + .BindIntNamedParam<MpiConfigurationOptions.FanOut>(fanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture)) + .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture)) + .Build(); + + IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig); + + HashSet<string> appDlls = new HashSet<string>(); + appDlls.Add(typeof(IDriver).Assembly.GetName().Name); + appDlls.Add(typeof(ITask).Assembly.GetName().Name); + appDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name); + appDlls.Add(typeof(INameClient).Assembly.GetName().Name); + appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + ClrClientHelper.Run(appDlls, merged, new DriverSubmissionSettings() { RunOnYarn = runOnYarn, JavaLogLevel = JavaLoggingSetting.VERBOSE }); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples.Client/Org.Apache.REEF.Network.Examples.Client.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Org.Apache.REEF.Network.Examples.Client.csproj b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Org.Apache.REEF.Network.Examples.Client.csproj new file mode 100644 index 0000000..6a65ae5 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Org.Apache.REEF.Network.Examples.Client.csproj @@ -0,0 +1,116 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> + <PropertyGroup> + <ProjectGuid>{E6DA8BC4-A346-48A7-99E3-D47ADD7DB975}</ProjectGuid> + <OutputType>Exe</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>Org.Apache.REEF.Network.Examples.Client</RootNamespace> + <AssemblyName>Org.Apache.REEF.Network.Examples.Client</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..</SolutionDir> + <RestorePackages>true</RestorePackages> + </PropertyGroup> + <Import Project="$(SolutionDir)\build.props" /> + <PropertyGroup> + <BuildPackage>false</BuildPackage> + <UseVSHostingProcess>false</UseVSHostingProcess> + </PropertyGroup> + <ItemGroup> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Xml.Linq" /> + <Reference Include="System.Data.DataSetExtensions" /> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="System.Data" /> + <Reference Include="System.Xml" /> + </ItemGroup> + <ItemGroup> + <Compile Include="BroadcastAndReduceClient.cs" /> + <Compile Include="PipelineBroadcastAndReduceClient.cs" /> + <Compile Include="Run.cs" /> + <Compile Include="Properties\AssemblyInfo.cs" /> + </ItemGroup> + <ItemGroup> + <None Include="run.cmd"> + <CopyToOutputDirectory>Always</CopyToOutputDirectory> + </None> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Bridge\Org.Apache.REEF.Bridge.vcxproj"> + <Project>{4e69d40a-26d6-4d4a-b96d-729946c07fe1}</Project> + <Name>Org.Apache.REEF.Bridge</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj"> + <Project>{5094c35b-4fdb-4322-ac05-45d684501cbf}</Project> + <Name>Org.Apache.REEF.Client</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj"> + <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project> + <Name>Org.Apache.REEF.Common</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Driver\Org.Apache.REEF.Driver.csproj"> + <Project>{a6baa2a7-f52f-4329-884e-1bcf711d6805}</Project> + <Name>Org.Apache.REEF.Driver</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Evaluator\Org.Apache.REEF.Evaluator.csproj"> + <Project>{1b983182-9c30-464c-948d-f87eb93a8240}</Project> + <Name>Org.Apache.REEF.Evaluator</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Network.Examples\Org.Apache.REEF.Network.Examples.csproj"> + <Project>{b1b43b60-ddd0-4805-a9b4-ba84a0ccb7c7}</Project> + <Name>Org.Apache.REEF.Network.Examples</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Network\Org.Apache.REEF.Network.csproj"> + <Project>{883ce800-6a6a-4e0a-b7fe-c054f4f2c1dc}</Project> + <Name>Org.Apache.REEF.Network</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj"> + <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project> + <Name>Org.Apache.REEF.Tang</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj"> + <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project> + <Name>Org.Apache.REEF.Utilities</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj"> + <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project> + <Name>Org.Apache.REEF.Wake</Name> + </ProjectReference> + </ItemGroup> + <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> + <Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" /> + <!--begin jar reference--> + <PropertyGroup> + <AfterBuildDependsOn> + $(AfterBuildDependsOn); + CopyJarFiles; + </AfterBuildDependsOn> + </PropertyGroup> + <Target Name="AfterBuild" DependsOnTargets="$(AfterBuildDependsOn);" /> + <!--end jar reference--> + <!-- To modify your build process, add your task inside one of the targets below and uncomment it. + Other similar extension points exist, see Microsoft.Common.targets. + <Target Name="BeforeBuild"> + </Target> + <Target Name="AfterBuild"> + </Target> + --> +</Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs new file mode 100644 index 0000000..570b227 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Network.Examples.GroupCommunication; +using Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks; +using Org.Apache.REEF.Network.Group.Config; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Network.Examples.Client +{ + public class PipelineBroadcastAndReduceClient + { + public void RunPipelineBroadcastAndReduce(bool runOnYarn, int numTasks) + { + IConfiguration driverConfig = TangFactory.GetTang().NewConfigurationBuilder( + DriverBridgeConfiguration.ConfigurationModule + .Set(DriverBridgeConfiguration.OnDriverStarted, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorRequested, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnEvaluatorFailed, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.OnContextActive, GenericType<PipelinedBroadcastReduceDriver>.Class) + .Set(DriverBridgeConfiguration.CustomTraceLevel, Level.Info.ToString()) + .Build()) + .BindNamedParameter<GroupTestConfig.NumIterations, int>( + GenericType<GroupTestConfig.NumIterations>.Class, + GroupTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<GroupTestConfig.NumEvaluators, int>( + GenericType<GroupTestConfig.NumEvaluators>.Class, + numTasks.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<GroupTestConfig.ChunkSize, int>( + GenericType<GroupTestConfig.ChunkSize>.Class, + GroupTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture)) + .Build(); + + IConfiguration mpiDriverConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindStringNamedParam<MpiConfigurationOptions.DriverId>(GroupTestConstants.DriverId) + .BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(GroupTestConstants.MasterTaskId) + .BindStringNamedParam<MpiConfigurationOptions.GroupName>(GroupTestConstants.GroupName) + .BindIntNamedParam<MpiConfigurationOptions.FanOut>(GroupTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture)) + .BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture)) + .Build(); + + IConfiguration merged = Configurations.Merge(driverConfig, mpiDriverConfig); + + HashSet<string> appDlls = new HashSet<string>(); + appDlls.Add(typeof(IDriver).Assembly.GetName().Name); + appDlls.Add(typeof(ITask).Assembly.GetName().Name); + appDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name); + appDlls.Add(typeof(INameClient).Assembly.GetName().Name); + appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + ClrClientHelper.Run(appDlls, merged, new DriverSubmissionSettings() { RunOnYarn = runOnYarn, JavaLogLevel = JavaLoggingSetting.VERBOSE }); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples.Client/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..9716526 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Properties/AssemblyInfo.cs @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Org.Apache.REEF.Tests.Yarn")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Org.Apache.REEF.Tests.Yarn")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("22ae839f-8ae7-46ac-b4bd-6d0d32213d83")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs new file mode 100644 index 0000000..899e548 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; + +namespace Org.Apache.REEF.Network.Examples.Client +{ + public class Run + { + static void Main(string[] args) + { + Console.WriteLine("start running client: " + DateTime.Now); + bool runOnYarn = false; + List<string> testToRun = new List<string>(); + if (args != null) + { + if (args.Length > 0) + { + runOnYarn = bool.Parse(args[0].ToLower()); + } + + for (int i = 1; i < args.Length; i++) + { + testToRun.Add(args[i].ToLower()); + } + } + + if (testToRun.Contains("RunPipelineBroadcastAndReduce".ToLower()) || testToRun.Contains("all") || testToRun.Count == 0) + { + new PipelineBroadcastAndReduceClient().RunPipelineBroadcastAndReduce(runOnYarn, 9); + Console.WriteLine("RunPipelineBroadcastAndReduce completed!!!"); + } + + if (testToRun.Contains("RunBroadcastAndReduce".ToLower()) || testToRun.Contains("all") || testToRun.Count == 0) + { + new BroadcastAndReduceClient().RunBroadcastAndReduce(runOnYarn, 9); + Console.WriteLine("RunBroadcastAndReduce completed!!!"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples.Client/run.cmd ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/run.cmd b/lang/cs/Org.Apache.REEF.Network.Examples.Client/run.cmd new file mode 100644 index 0000000..bfdd44e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/run.cmd @@ -0,0 +1,45 @@ +@REM +@REM Copyright (C) 2013 Microsoft Corporation +@REM +@REM Licensed under the Apache License, Version 2.0 (the "License"); +@REM you may not use this file except in compliance with the License. +@REM You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, software +@REM distributed under the License is distributed on an "AS IS" BASIS, +@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@REM See the License for the specific language governing permissions and +@REM limitations under the License. +@REM + +@echo off +:: +:: Copyright (C) 2013 Microsoft Corporation +:: +:: Licensed under the Apache License, Version 2.0 (the "License"); +:: you may not use this file except in compliance with the License. +:: You may obtain a copy of the License at +:: +:: http:\\www.apache.org\licenses\LICENSE-2.0 +:: +:: Unless required by applicable law or agreed to in writing, software +:: distributed under the License is distributed on an "AS IS" BASIS, +:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +:: See the License for the specific language governing permissions and +:: limitations under the License. +:: + + +:: RUNTIME +set SHADED_JAR=.\reef-bridge-java-0.11.0-incubating-SNAPSHOT-shaded.jar + +set LOGGING_CONFIG=-Djava.util.logging.config.class=org.apache.reef.util.logging.Config + +set CLASSPATH=%HADOOP_HOME%\share\hadoop\hdfs\lib\*;%HADOOP_HOME%\share\hadoop\hdfs\*;%HADOOP_HOME%\share\hadoop\common\*;%HADOOP_HOME%\share\hadoop\common\lib\*;%HADOOP_HOME%\share\hadoop\mapreduce\lib\*;%HADOOP_HOME%\share\hadoop\mapreduce\*;%HADOOP_HOME%\share\hadoop\yarn\*;%HADOOP_HOME%\share\hadoop\yarn\lib\* + +set CMD=%JAVA_HOME%\bin\java.exe -cp %HADOOP_HOME%\etc\hadoop;%SHADED_JAR%;%CLASSPATH% %* +::%LOGGING_CONFIG% +echo %CMD% +%CMD% http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs new file mode 100644 index 0000000..c6b8578 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks +{ + public class BroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(BroadcastReduceDriver)); + + private readonly int _numEvaluators; + private readonly int _numIterations; + + private readonly IMpiDriver _mpiDriver; + private readonly ICommunicationGroupDriver _commGroup; + private readonly TaskStarter _mpiTaskStarter; + + [Inject] + public BroadcastReduceDriver( + [Parameter(typeof(GroupTestConfig.NumEvaluators))] int numEvaluators, + [Parameter(typeof(GroupTestConfig.NumIterations))] int numIterations, + MpiDriver mpiDriver) + { + Identifier = "BroadcastStartHandler"; + _numEvaluators = numEvaluators; + _numIterations = numIterations; + _mpiDriver = mpiDriver; + _commGroup = _mpiDriver.DefaultGroup + .AddBroadcast<int, IntCodec>( + GroupTestConstants.BroadcastOperatorName, + GroupTestConstants.MasterTaskId) + .AddReduce<int, IntCodec>( + GroupTestConstants.ReduceOperatorName, + GroupTestConstants.MasterTaskId, + new SumFunction()) + .Build(); + + _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators); + + CreateClassHierarchy(); + } + + public string Identifier { get; set; } + + public void OnNext(IEvaluatorRequestor evaluatorRequestor) + { + EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator"); + evaluatorRequestor.Submit(request); + } + + public void OnNext(IAllocatedEvaluator allocatedEvaluator) + { + IConfiguration contextConf = _mpiDriver.GetContextConfiguration(); + IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration(); + allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); + } + + public void OnNext(IActiveContext activeContext) + { + if (_mpiDriver.IsMasterTaskContext(activeContext)) + { + // Configure Master Task + IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder( + TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, GroupTestConstants.MasterTaskId) + .Set(TaskConfiguration.Task, GenericType<MasterTask>.Class) + .Build()) + .BindNamedParameter<GroupTestConfig.NumEvaluators, int>( + GenericType<GroupTestConfig.NumEvaluators>.Class, + _numEvaluators.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<GroupTestConfig.NumIterations, int>( + GenericType<GroupTestConfig.NumIterations>.Class, + _numIterations.ToString(CultureInfo.InvariantCulture)) + .Build(); + + _commGroup.AddTask(GroupTestConstants.MasterTaskId); + _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + } + else + { + // Configure Slave Task + string slaveTaskId = "SlaveTask-" + activeContext.Id; + IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder( + TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, slaveTaskId) + .Set(TaskConfiguration.Task, GenericType<SlaveTask>.Class) + .Build()) + .BindNamedParameter<GroupTestConfig.NumEvaluators, int>( + GenericType<GroupTestConfig.NumEvaluators>.Class, + _numEvaluators.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<GroupTestConfig.NumIterations, int>( + GenericType<GroupTestConfig.NumIterations>.Class, + _numIterations.ToString(CultureInfo.InvariantCulture)) + .Build(); + + _commGroup.AddTask(slaveTaskId); + _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + } + } + + public void OnNext(IFailedEvaluator value) + { + } + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + + private void CreateClassHierarchy() + { + HashSet<string> clrDlls = new HashSet<string>(); + clrDlls.Add(typeof(IDriver).Assembly.GetName().Name); + clrDlls.Add(typeof(ITask).Assembly.GetName().Name); + clrDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name); + clrDlls.Add(typeof(INameClient).Assembly.GetName().Name); + clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + ClrHandlerHelper.GenerateClassHierarchy(clrDlls); + } + + private class SumFunction : IReduceFunction<int> + { + [Inject] + public SumFunction() + { + } + + public int Reduce(IEnumerable<int> elements) + { + return elements.Sum(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs new file mode 100644 index 0000000..1ca2353 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks +{ + public class MasterTask : ITask + { + private static readonly Logger _logger = Logger.GetLogger(typeof(MasterTask)); + + private readonly int _numIters; + private readonly int _numReduceSenders; + + private readonly IMpiClient _mpiClient; + private readonly ICommunicationGroupClient _commGroup; + private readonly IBroadcastSender<int> _broadcastSender; + private readonly IReduceReceiver<int> _sumReducer; + + [Inject] + public MasterTask( + [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters, + [Parameter(typeof(GroupTestConfig.NumEvaluators))] int numEvaluators, + IMpiClient mpiClient) + { + _logger.Log(Level.Info, "Hello from master task"); + _numIters = numIters; + _numReduceSenders = numEvaluators - 1; + _mpiClient = mpiClient; + + _commGroup = mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName); + _broadcastSender = _commGroup.GetBroadcastSender<int>(GroupTestConstants.BroadcastOperatorName); + _sumReducer = _commGroup.GetReduceReceiver<int>(GroupTestConstants.ReduceOperatorName); + } + + public byte[] Call(byte[] memento) + { + for (int i = 1; i <= _numIters; i++) + { + // Each slave task calculates the nth triangle number + _broadcastSender.Send(i); + + // Sum up all of the calculated triangle numbers + int sum = _sumReducer.Reduce(); + _logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i); + + int expected = TriangleNumber(i) * _numReduceSenders; + if (sum != TriangleNumber(i) * _numReduceSenders) + { + throw new Exception("Expected " + expected + " but got " + sum); + } + } + + return null; + } + + public void Dispose() + { + _mpiClient.Dispose(); + } + + private int TriangleNumber(int n) + { + return Enumerable.Range(1, n).Sum(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs new file mode 100644 index 0000000..c09bb80 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks +{ + public class SlaveTask : ITask + { + private static readonly Logger _logger = Logger.GetLogger(typeof(SlaveTask)); + + private readonly int _numIterations; + private readonly IMpiClient _mpiClient; + private readonly ICommunicationGroupClient _commGroup; + private readonly IBroadcastReceiver<int> _broadcastReceiver; + private readonly IReduceSender<int> _triangleNumberSender; + + [Inject] + public SlaveTask( + [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters, + IMpiClient mpiClient) + { + _logger.Log(Level.Info, "Hello from slave task"); + + _numIterations = numIters; + _mpiClient = mpiClient; + _commGroup = _mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName); + _broadcastReceiver = _commGroup.GetBroadcastReceiver<int>(GroupTestConstants.BroadcastOperatorName); + _triangleNumberSender = _commGroup.GetReduceSender<int>(GroupTestConstants.ReduceOperatorName); + } + + public byte[] Call(byte[] memento) + { + for (int i = 0; i < _numIterations; i++) + { + // Receive n from Master Task + int n = _broadcastReceiver.Receive(); + _logger.Log(Level.Info, "Calculating TriangleNumber({0}) on slave task...", n); + + // Calculate the nth Triangle number and send it back to driver + int triangleNum = TriangleNumber(n); + _logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", triangleNum, i); + _triangleNumberSender.Send(triangleNum); + } + + return null; + } + + public void Dispose() + { + _mpiClient.Dispose(); + } + + private int TriangleNumber(int n) + { + return Enumerable.Range(1, n).Sum(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs new file mode 100644 index 0000000..c7d93c1 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Network.Examples.GroupCommunication +{ + public class GroupTestConfig + { + [NamedParameter("Number of iterations of messages to send")] + public class NumIterations : Name<int> + { + } + + [NamedParameter("Number of Evaluators")] + public class NumEvaluators : Name<int> + { + } + + [NamedParameter("tree width")] + public class FanOut : Name<int> + { + } + + [NamedParameter("integer id of the evaluator")] + public class EvaluatorId : Name<string> + { + } + + [NamedParameter("Size of the array")] + public class ArraySize : Name<int> + { + } + + [NamedParameter("Chunk size for pipelining")] + public class ChunkSize : Name<int> + { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConstants.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConstants.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConstants.cs new file mode 100644 index 0000000..cbfebfb --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConstants.cs @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.Network.Examples.GroupCommunication +{ + public class GroupTestConstants + { + public const string DriverId = "BroadcastReduceDriver"; + public const string GroupName = "BroadcastReduceGroup"; + public const string BroadcastOperatorName = "Broadcast"; + public const string ReduceOperatorName = "Reduce"; + public const string ScatterOperatorName = "Scatter"; + public const string MasterTaskId = "MasterTask"; + public const string SlaveTaskId = "SlaveTask-"; + public const int NumIterations = 10; + public const int FanOut = 2; + public const int ChunkSize = 2; + public const int ArrayLength = 6; + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs new file mode 100644 index 0000000..d108d68 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Pipelining; +using Org.Apache.REEF.Network.Group.Topology; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks +{ + public class PipelinedBroadcastReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedBroadcastReduceDriver)); + + private readonly int _numEvaluators; + private readonly int _numIterations; + private readonly int _chunkSize; + + private readonly IMpiDriver _mpiDriver; + private readonly ICommunicationGroupDriver _commGroup; + private readonly TaskStarter _mpiTaskStarter; + + [Inject] + public PipelinedBroadcastReduceDriver( + [Parameter(typeof (GroupTestConfig.NumEvaluators))] int numEvaluators, + [Parameter(typeof(GroupTestConfig.NumIterations))] int numIterations, + [Parameter(typeof(GroupTestConfig.ChunkSize))] int chunkSize, + MpiDriver mpiDriver) + { + Logger.Log(Level.Info, "*******entering the driver code " + chunkSize); + + Identifier = "BroadcastStartHandler"; + _numEvaluators = numEvaluators; + _numIterations = numIterations; + _chunkSize = chunkSize; + + _mpiDriver = mpiDriver; + + _commGroup = _mpiDriver.DefaultGroup + .AddBroadcast<int[], IntArrayCodec>( + GroupTestConstants.BroadcastOperatorName, + GroupTestConstants.MasterTaskId, + TopologyTypes.Tree, + new PipelineIntDataConverter(_chunkSize)) + .AddReduce<int[], IntArrayCodec>( + GroupTestConstants.ReduceOperatorName, + GroupTestConstants.MasterTaskId, + new ArraySumFunction(), + TopologyTypes.Tree, + new PipelineIntDataConverter(_chunkSize)) + .Build(); + + _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators); + + CreateClassHierarchy(); + } + + public string Identifier { get; set; } + + public void OnNext(IEvaluatorRequestor evaluatorRequestor) + { + EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator"); + evaluatorRequestor.Submit(request); + } + + public void OnNext(IAllocatedEvaluator allocatedEvaluator) + { + IConfiguration contextConf = _mpiDriver.GetContextConfiguration(); + IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration(); + allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); + } + + public void OnNext(IActiveContext activeContext) + { + if (_mpiDriver.IsMasterTaskContext(activeContext)) + { + Logger.Log(Level.Info, "******* Master ID " + activeContext.Id ); + + // Configure Master Task + IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder( + TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, GroupTestConstants.MasterTaskId) + .Set(TaskConfiguration.Task, GenericType<PipelinedMasterTask>.Class) + .Build()) + .BindNamedParameter<GroupTestConfig.NumEvaluators, int>( + GenericType<GroupTestConfig.NumEvaluators>.Class, + _numEvaluators.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<GroupTestConfig.NumIterations, int>( + GenericType<GroupTestConfig.NumIterations>.Class, + _numIterations.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<GroupTestConfig.ArraySize, int>( + GenericType<GroupTestConfig.ArraySize>.Class, + GroupTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture)) + .Build(); + + _commGroup.AddTask(GroupTestConstants.MasterTaskId); + _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + } + else + { + // Configure Slave Task + string slaveTaskId = "SlaveTask-" + activeContext.Id; + IConfiguration partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder( + TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, slaveTaskId) + .Set(TaskConfiguration.Task, GenericType<PipelinedSlaveTask>.Class) + .Build()) + .BindNamedParameter<GroupTestConfig.NumEvaluators, int>( + GenericType<GroupTestConfig.NumEvaluators>.Class, + _numEvaluators.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<GroupTestConfig.NumIterations, int>( + GenericType<GroupTestConfig.NumIterations>.Class, + _numIterations.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter<GroupTestConfig.ArraySize, int>( + GenericType<GroupTestConfig.ArraySize>.Class, + GroupTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture)) + .Build(); + + _commGroup.AddTask(slaveTaskId); + _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + } + } + + public void OnNext(IFailedEvaluator value) + { + } + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + + private void CreateClassHierarchy() + { + HashSet<string> clrDlls = new HashSet<string>(); + clrDlls.Add(typeof(IDriver).Assembly.GetName().Name); + clrDlls.Add(typeof(ITask).Assembly.GetName().Name); + clrDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name); + clrDlls.Add(typeof(INameClient).Assembly.GetName().Name); + clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + ClrHandlerHelper.GenerateClassHierarchy(clrDlls); + } + + private class SumFunction : IReduceFunction<int> + { + [Inject] + public SumFunction() + { + } + + public int Reduce(IEnumerable<int> elements) + { + return elements.Sum(); + } + } + + private class ArraySumFunction : IReduceFunction<int[]> + { + [Inject] + public ArraySumFunction() + { + } + + public int[] Reduce(IEnumerable<int[]> elements) + { + int[] result = null; + int count = 0; + + foreach (var element in elements) + { + if (count == 0) + { + result = element.Clone() as int[]; + } + else + { + if (element.Length != result.Length) + { + throw new Exception("integer arrays are of different sizes"); + } + + for (int i = 0; i < result.Length; i++) + { + result[i] += element[i]; + } + } + + count++; + } + + return result; + } + } + + + private class IntArrayCodec : ICodec<int[]> + { + [Inject] + public IntArrayCodec() + { + } + + public byte[] Encode(int[] obj) + { + byte[] result = new byte[sizeof(Int32) * obj.Length]; + Buffer.BlockCopy(obj, 0, result, 0, result.Length); + return result; + } + + public int[] Decode(byte[] data) + { + if (data.Length % sizeof(Int32) != 0) + { + throw new Exception("error inside integer array decoder, byte array length not a multiple of interger size"); + } + + int[] result = new int[data.Length / sizeof(Int32)]; + Buffer.BlockCopy(data, 0, result, 0, data.Length); + return result; + } + } + + public class PipelineIntDataConverter : IPipelineDataConverter<int[]> + { + readonly int _chunkSize; + + [Inject] + public PipelineIntDataConverter([Parameter(typeof(GroupTestConfig.ChunkSize))] int chunkSize) + { + _chunkSize = chunkSize; + } + + public List<PipelineMessage<int[]>> PipelineMessage(int[] message) + { + List<PipelineMessage<int[]>> messageList = new List<PipelineMessage<int[]>>(); + int totalChunks = message.Length / _chunkSize; + + if (message.Length % _chunkSize != 0) + { + totalChunks++; + } + + int counter = 0; + for (int i = 0; i < message.Length; i += _chunkSize) + { + int[] data = new int[Math.Min(_chunkSize, message.Length - i)]; + Buffer.BlockCopy(message, i * sizeof(int), data, 0, data.Length * sizeof(int)); + + messageList.Add(counter == totalChunks - 1 + ? new PipelineMessage<int[]>(data, true) + : new PipelineMessage<int[]>(data, false)); + + counter++; + } + + return messageList; + } + + public int[] FullMessage(List<PipelineMessage<int[]>> pipelineMessage) + { + int size = pipelineMessage.Select(x => x.Data.Length).Sum(); + int[] data = new int[size]; + int offset = 0; + + foreach (var message in pipelineMessage) + { + Buffer.BlockCopy(message.Data, 0, data, offset, message.Data.Length * sizeof(int)); + offset += message.Data.Length * sizeof(int); + } + + return data; + } + + public IConfiguration GetConfiguration() + { + return TangFactory.GetTang().NewConfigurationBuilder() + .BindNamedParameter<GroupTestConfig.ChunkSize, int>(GenericType<GroupTestConfig.ChunkSize>.Class, _chunkSize.ToString(CultureInfo.InvariantCulture)) + .Build(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs new file mode 100644 index 0000000..0e4ccee --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks +{ + public class PipelinedMasterTask : ITask + { + private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedMasterTask)); + + private readonly int _numIters; + private readonly int _numReduceSenders; + private readonly int _arraySize; + + private readonly IMpiClient _mpiClient; + private readonly ICommunicationGroupClient _commGroup; + private readonly IBroadcastSender<int[]> _broadcastSender; + private readonly IReduceReceiver<int[]> _sumReducer; + + [Inject] + public PipelinedMasterTask( + [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters, + [Parameter(typeof(GroupTestConfig.NumEvaluators))] int numEvaluators, + [Parameter(typeof(GroupTestConfig.ArraySize))] int arraySize, + IMpiClient mpiClient) + { + Logger.Log(Level.Info, "Hello from master task"); + _numIters = numIters; + _numReduceSenders = numEvaluators - 1; + _arraySize = arraySize; + _mpiClient = mpiClient; + + _commGroup = mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName); + _broadcastSender = _commGroup.GetBroadcastSender<int[]>(GroupTestConstants.BroadcastOperatorName); + _sumReducer = _commGroup.GetReduceReceiver<int[]>(GroupTestConstants.ReduceOperatorName); + Logger.Log(Level.Info, "finished master task constructor"); + } + + public byte[] Call(byte[] memento) + { + int[] intArr = new int[_arraySize]; + + for (int i = 1; i <= _numIters; i++) + { + for (int j = 0; j < _arraySize; j++) + { + intArr[j] = i; + } + + _broadcastSender.Send(intArr); + int[] sum = _sumReducer.Reduce(); + + Logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", sum, i); + + int expected = TriangleNumber(i) * _numReduceSenders; + + for (int j = 0; j < intArr.Length; j++) + { + if (sum[j] != TriangleNumber(i) * _numReduceSenders) + { + throw new Exception("Expected " + expected + " but got " + sum); + } + } + } + + return null; + } + + public void Dispose() + { + _mpiClient.Dispose(); + } + + private int TriangleNumber(int n) + { + return Enumerable.Range(1, n).Sum(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs new file mode 100644 index 0000000..503e6e3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks +{ + public class PipelinedSlaveTask : ITask + { + private static readonly Logger Logger = Logger.GetLogger(typeof(PipelinedSlaveTask)); + + private readonly int _numIterations; + private readonly IMpiClient _mpiClient; + private readonly ICommunicationGroupClient _commGroup; + private readonly IBroadcastReceiver<int[]> _broadcastReceiver; + private readonly IReduceSender<int[]> _triangleNumberSender; + + [Inject] + public PipelinedSlaveTask( + [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters, + IMpiClient mpiClient) + { + Logger.Log(Level.Info, "Hello from slave task"); + + _numIterations = numIters; + _mpiClient = mpiClient; + _commGroup = _mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName); + _broadcastReceiver = _commGroup.GetBroadcastReceiver<int[]>(GroupTestConstants.BroadcastOperatorName); + _triangleNumberSender = _commGroup.GetReduceSender<int[]>(GroupTestConstants.ReduceOperatorName); + } + + public byte[] Call(byte[] memento) + { + for (int i = 0; i < _numIterations; i++) + { + // Receive n from Master Task + int[] intVec = _broadcastReceiver.Receive(); + + Logger.Log(Level.Info, "Calculating TriangleNumber({0}) on slave task...", intVec[0]); + + // Calculate the nth Triangle number and send it back to driver + int triangleNum = TriangleNumber(intVec[0]); + Logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", triangleNum, i); + + int[] resArr = new int[intVec.Length]; + + for (int j = 0; j < resArr.Length; j++) + { + resArr[j] = triangleNum; + } + + _triangleNumberSender.Send(resArr); + } + + return null; + } + + public void Dispose() + { + _mpiClient.Dispose(); + } + + private int TriangleNumber(int n) + { + return Enumerable.Range(1, n).Sum(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs new file mode 100644 index 0000000..a3d17d7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Linq; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks +{ + public class MasterTask : ITask + { + private static readonly Logger _logger = Logger.GetLogger(typeof(MasterTask)); + + private readonly IMpiClient _mpiClient; + private readonly ICommunicationGroupClient _commGroup; + private readonly IScatterSender<int> _scatterSender; + private readonly IReduceReceiver<int> _sumReducer; + + [Inject] + public MasterTask(IMpiClient mpiClient) + { + _logger.Log(Level.Info, "Hello from master task"); + _mpiClient = mpiClient; + + _commGroup = mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName); + _scatterSender = _commGroup.GetScatterSender<int>(GroupTestConstants.ScatterOperatorName); + _sumReducer = _commGroup.GetReduceReceiver<int>(GroupTestConstants.ReduceOperatorName); + } + + public byte[] Call(byte[] memento) + { + List<int> data = Enumerable.Range(1, 100).ToList(); + _scatterSender.Send(data); + + int sum = _sumReducer.Reduce(); + _logger.Log(Level.Info, "Received sum: {0}", sum); + + return null; + } + + public void Dispose() + { + _mpiClient.Dispose(); + } + + private List<string> GetScatterOrder() + { + return new List<string> { "SlaveTask-4", "SlaveTask-3", "SlaveTask-2", "SlaveTask-1" }; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs new file mode 100644 index 0000000..fab12e5 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Bridge; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Network.Group.Topology; +using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks +{ + public class ScatterReduceDriver : IStartHandler, IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<IFailedEvaluator> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ScatterReduceDriver)); + + private readonly int _numEvaluators; + + private readonly IMpiDriver _mpiDriver; + private readonly ICommunicationGroupDriver _commGroup; + private readonly TaskStarter _mpiTaskStarter; + + [Inject] + public ScatterReduceDriver( + [Parameter(typeof(GroupTestConfig.NumEvaluators))] int numEvaluators, + MpiDriver mpiDriver) + { + Identifier = "BroadcastStartHandler"; + _numEvaluators = numEvaluators; + _mpiDriver = mpiDriver; + _commGroup = _mpiDriver.DefaultGroup + .AddScatter<int, IntCodec>( + GroupTestConstants.ScatterOperatorName, + GroupTestConstants.MasterTaskId, + TopologyTypes.Tree) + .AddReduce<int, IntCodec>( + GroupTestConstants.ReduceOperatorName, + GroupTestConstants.MasterTaskId, + new SumFunction()) + .Build(); + + _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators); + + CreateClassHierarchy(); + } + + public string Identifier { get; set; } + + public void OnNext(IEvaluatorRequestor evaluatorRequestor) + { + EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator"); + evaluatorRequestor.Submit(request); + } + + public void OnNext(IAllocatedEvaluator allocatedEvaluator) + { + IConfiguration contextConf = _mpiDriver.GetContextConfiguration(); + IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration(); + allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); + } + + public void OnNext(IActiveContext activeContext) + { + if (_mpiDriver.IsMasterTaskContext(activeContext)) + { + // Configure Master Task + IConfiguration partialTaskConf = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, GroupTestConstants.MasterTaskId) + .Set(TaskConfiguration.Task, GenericType<MasterTask>.Class) + .Build(); + + _commGroup.AddTask(GroupTestConstants.MasterTaskId); + _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + } + else + { + // Configure Slave Task + string slaveTaskId = GroupTestConstants.SlaveTaskId + + _mpiDriver.GetContextNum(activeContext); + + IConfiguration partialTaskConf = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, slaveTaskId) + .Set(TaskConfiguration.Task, GenericType<SlaveTask>.Class) + .Build(); + + _commGroup.AddTask(slaveTaskId); + _mpiTaskStarter.QueueTask(partialTaskConf, activeContext); + } + } + + public void OnNext(IFailedEvaluator value) + { + } + + public void OnError(Exception error) + { + } + + public void OnCompleted() + { + } + + private void CreateClassHierarchy() + { + HashSet<string> clrDlls = new HashSet<string>(); + clrDlls.Add(typeof(IDriver).Assembly.GetName().Name); + clrDlls.Add(typeof(ITask).Assembly.GetName().Name); + clrDlls.Add(typeof(ScatterReduceDriver).Assembly.GetName().Name); + clrDlls.Add(typeof(INameClient).Assembly.GetName().Name); + clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); + + ClrHandlerHelper.GenerateClassHierarchy(clrDlls); + } + + private class SumFunction : IReduceFunction<int> + { + [Inject] + public SumFunction() + { + } + + public int Reduce(IEnumerable<int> elements) + { + return elements.Sum(); + } + } + } +}
