Repository: incubator-reef Updated Branches: refs/heads/master 15dd78698 -> 5b359ca97
[REEF-427] Add IMRU API and single threaded implementation This adds `Org.Apache.REEF.IMRU.API` which contains the API for IMRU as well as `Org.Apache.REEF.IMRU.InProcess` which is a single-threaded implementation for testing. Also, this change adds a very simple example IMRU program that counts the number of mappers in the job. This example is used for the first test in `Org.Apache.REEF.IMRU.Tests` Allow the tests project access to internal APIs JIRA: [REEF-427](https://issues.apache.org/jira/browse/REEF-427) Pull Request: This closes #269 Author: Markus Weimer <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/5b359ca9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/5b359ca9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/5b359ca9 Branch: refs/heads/master Commit: 5b359ca973b240058c6caf36d8bf6e6a91573d33 Parents: 15dd786 Author: Markus Weimer <[email protected]> Authored: Mon Jun 29 15:39:40 2015 -0700 Committer: Julia Wang <[email protected]> Committed: Thu Jul 2 13:19:14 2015 -0700 ---------------------------------------------------------------------- .../MapperCountTest.cs | 53 +++++++++++ .../Org.Apache.REEF.IMRU.Tests.csproj | 65 ++++++++++++++ .../Properties/AssemblyInfo.cs | 34 ++++++++ lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs | 38 ++++++++ .../API/IMRUConfiguration.cs | 82 +++++++++++++++++ .../API/IMRUJobDefinition.cs | 49 +++++++++++ .../API/IMRUJobDefinitionBuilder.cs | 75 ++++++++++++++++ .../cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs | 41 +++++++++ .../Org.Apache.REEF.IMRU/API/IUpdateFunction.cs | 46 ++++++++++ .../API/Parameters/MapInputCodec.cs | 33 +++++++ .../API/Parameters/MapOutputCodec.cs | 33 +++++++ .../API/Parameters/ResultCodec.cs | 33 +++++++ .../cs/Org.Apache.REEF.IMRU/API/UpdateResult.cs | 87 +++++++++++++++++++ .../Examples/MapperCount/IdentityMapFunction.cs | 40 +++++++++ .../MapperCount/IntSumReduceFunction.cs | 42 +++++++++ .../Examples/MapperCount/MapperCount.cs | 67 ++++++++++++++ .../MapperCount/MapperCountUpdateFunction.cs | 49 +++++++++++ .../InProcess/IMRURunner.cs | 75 ++++++++++++++++ .../InProcess/InProcessIMRUClient.cs | 76 ++++++++++++++++ .../InProcess/InProcessIMRUConfiguration.cs | 47 ++++++++++ .../InProcess/MapFunctions.cs | 45 ++++++++++ .../InProcess/Parameters/NumberOfMappers.cs | 31 +++++++ .../Org.Apache.REEF.IMRU.csproj | 19 ++++ .../Properties/AssemblyInfo.cs | 5 +- lang/cs/Org.Apache.REEF.sln | Bin 25004 -> 26004 bytes 25 files changed, 1164 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs new file mode 100644 index 0000000..42f165f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/MapperCountTest.cs @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.IMRU.Examples.MapperCount; +using Org.Apache.REEF.IMRU.InProcess; +using Org.Apache.REEF.Tang.Implementations.Tang; + +namespace Org.Apache.REEF.IMRU.Tests +{ + /// <summary> + /// Tests of the mapper counting job. + /// </summary> + [TestClass] + public class MapperCountTest + { + private const int NumberOfMappers = 7; + + /// <summary> + /// Tests the mapper counting job using the in-process IMRU implementation. + /// </summary> + [TestMethod] + public void TestMapperCountInProcess() + { + var tested = + TangFactory.GetTang() + .NewInjector( + InProcessIMRUConfiguration<int, int, int>.ConfigurationModule + .Set(InProcessIMRUConfiguration<int, int, int>.NumberOfMappers, NumberOfMappers.ToString()) + .Build() + ) + .GetInstance<MapperCount>(); + var result = tested.Run(); + Assert.AreEqual(NumberOfMappers, result, "The result of the run should be the number of Mappers."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj new file mode 100644 index 0000000..1bf3fdf --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj @@ -0,0 +1,65 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> + <PropertyGroup> + <ProjectGuid>{4B4AF206-7AF6-4BDE-AFA4-416FBD6DCCB6}</ProjectGuid> + <OutputType>Library</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>Org.Apache.REEF.IMRU.Tests</RootNamespace> + <AssemblyName>Org.Apache.REEF.IMRU.Tests</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..</SolutionDir> + <RestorePackages>true</RestorePackages> + </PropertyGroup> + <Import Project="$(SolutionDir)\build.props" /> + <ItemGroup> + <Reference Include="Microsoft.VisualStudio.QualityTools.UnitTestFramework, Version=10.1.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL" /> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Xml.Linq" /> + <Reference Include="System.Data.DataSetExtensions" /> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="System.Data" /> + <Reference Include="System.Xml" /> + </ItemGroup> + <ItemGroup> + <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="MapperCountTest.cs" /> + </ItemGroup> + <ItemGroup> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.IMRU\Org.Apache.REEF.IMRU.csproj"> + <Project>{cc797c57-b465-4d11-98ac-edaaef5899a6}</Project> + <Name>Org.Apache.REEF.IMRU</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Network\Org.Apache.REEF.Network.csproj"> + <Project>{883ce800-6a6a-4e0a-b7fe-c054f4f2c1dc}</Project> + <Name>Org.Apache.REEF.Network</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj"> + <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project> + <Name>Org.Apache.REEF.Tang</Name> + </ProjectReference> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj"> + <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project> + <Name>Org.Apache.REEF.Wake</Name> + </ProjectReference> + </ItemGroup> + <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> +</Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU.Tests/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..97b7122 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/Properties/AssemblyInfo.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Reflection; +using System.Runtime.InteropServices; + +[assembly: AssemblyTitle("Org.Apache.REEF.IMRU.Tests")] +[assembly: AssemblyDescription("Tests for Iterative Map Reduce Update (IMRU) API for REEF")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("The Apache Software Foundation.")] +[assembly: AssemblyProduct("Org.Apache.REEF.IMRU.Tests")] +[assembly: AssemblyCopyright("The Apache Software Foundation")] +[assembly: AssemblyTrademark("The Apache Software Foundation")] +[assembly: AssemblyCulture("")] +[assembly: ComVisible(false)] +[assembly: Guid("e3c78171-f90c-4737-b673-31d053c16313")] +[assembly: AssemblyVersion("0.12.0.0")] +[assembly: AssemblyFileVersion("0.12.0.0")] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs new file mode 100644 index 0000000..29c45cf --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; + +namespace Org.Apache.REEF.IMRU.API +{ + /// <summary> + /// Job submission interface for IMRU jobs. + /// </summary> + /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> + /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> + /// <typeparam name="TResult">The return type of the computation.</typeparam> + public interface IIMRUClient<TMapInput, TMapOutput, TResult> + { + /// <summary> + /// Submit the given job for execution. + /// </summary> + /// <param name="jobDefinition"></param> + IEnumerable<TResult> Submit(IMRUJobDefinition jobDefinition); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/API/IMRUConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUConfiguration.cs new file mode 100644 index 0000000..595f64a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUConfiguration.cs @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.IMRU.API.Parameters; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.StreamingCodec; + +namespace Org.Apache.REEF.IMRU.API +{ + /// <summary> + /// A configuration module for IMRU. + /// </summary> + /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> + /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> + /// <typeparam name="TResult">The return type of the computation.</typeparam> + public sealed class IMRUConfiguration<TMapInput, TMapOutput, TResult> : ConfigurationModuleBuilder + { + /// <summary> + /// The codec to be used for the map input. + /// </summary> + public static readonly RequiredImpl<IStreamingCodec<TMapInput>> MapInputCodec = + new RequiredImpl<IStreamingCodec<TMapInput>>(); + + /// <summary> + /// The codec to be used for the map output. + /// </summary> + public static readonly RequiredImpl<IStreamingCodec<TMapOutput>> MapOutputCodec = + new RequiredImpl<IStreamingCodec<TMapOutput>>(); + + /// <summary> + /// The codec to be used for the result. + /// </summary> + public static readonly RequiredImpl<IStreamingCodec<TResult>> ResultCodec = + new RequiredImpl<IStreamingCodec<TResult>>(); + + /// <summary> + /// The IReduceFunction type to use. + /// </summary> + public static readonly RequiredImpl<IReduceFunction<TMapOutput>> ReduceFunction = + new RequiredImpl<IReduceFunction<TMapOutput>>(); + + /// <summary> + /// The IUpdateFunction type to use. + /// </summary> + public static readonly RequiredImpl<IUpdateFunction<TMapInput, TMapOutput, TResult>> UpdateFunction = + new RequiredImpl<IUpdateFunction<TMapInput, TMapOutput, TResult>>(); + + /// <summary> + /// The IMapFunction type to use. + /// </summary> + public static readonly RequiredImpl<IMapFunction<TMapInput, TMapOutput>> MapFunction = + new RequiredImpl<IMapFunction<TMapInput, TMapOutput>>(); + + public static ConfigurationModule ConfigurationModule = + new IMRUConfiguration<TMapInput, TMapOutput, TResult>() + .BindNamedParameter(GenericType<MapInputCodec<TMapInput>>.Class, MapInputCodec) + .BindNamedParameter(GenericType<MapOutputCodec<TMapOutput>>.Class, MapOutputCodec) + .BindNamedParameter(GenericType<ResultCodec<TResult>>.Class, ResultCodec) + .BindImplementation(GenericType<IReduceFunction<TMapOutput>>.Class, ReduceFunction) + .BindImplementation(GenericType<IUpdateFunction<TMapInput, TMapOutput, TResult>>.Class, UpdateFunction) + .BindImplementation(GenericType<IMapFunction<TMapInput, TMapOutput>>.Class, MapFunction) + .Build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs new file mode 100644 index 0000000..5744be8 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Interface; + +namespace Org.Apache.REEF.IMRU.API +{ + /// <summary> + /// Describes an IMRU Job. + /// </summary> + /// <seealso cref="IMRUJobDefinitionBuilder" /> + public sealed class IMRUJobDefinition + { + private readonly IConfiguration _configuration; + private readonly string _jobName; + + internal IMRUJobDefinition(IConfiguration configuration, string jobName) + { + _configuration = configuration; + _jobName = jobName; + } + + internal IConfiguration Configuration + { + get { return _configuration; } + } + + internal string JobName + { + get { return _jobName; } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs new file mode 100644 index 0000000..2427016 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Tang.Interface; + +namespace Org.Apache.REEF.IMRU.API +{ + /// <summary> + /// Use this class to create an IMRU Job Definition. + /// </summary> + /// <seealso cref="IMRUJobDefinition" /> + public sealed class IMRUJobDefinitionBuilder + { + private IConfiguration _configuration; + private string _jobName; + + /// <summary> + /// Set the Configuration used to instantiate the IMapFunction, IReduceFunction, IUpdateFunction and all codec instances + /// </summary> + /// <param name="configuration">The Configuration used to instantiate the IMapFunction instance.</param> + /// <seealso cref="IMRUConfiguration{TMapInput,TMapOutput,TResult}" /> + /// <returns>this</returns> + public IMRUJobDefinitionBuilder SetConfiguration(IConfiguration configuration) + { + _configuration = configuration; + return this; + } + + /// <summary> + /// Set the name of the job. + /// </summary> + /// <param name="name">the name of the job</param> + /// <returns>this</returns> + public IMRUJobDefinitionBuilder SetJobName(string name) + { + _jobName = name; + return this; + } + + /// <summary> + /// Instantiate the IMRUJobDefinition. + /// </summary> + /// <returns>The IMRUJobDefintion configured.</returns> + /// <exception cref="NullReferenceException">If any of the required paremeters is not set.</exception> + public IMRUJobDefinition Build() + { + if (null == _configuration) + { + throw new NullReferenceException("Configuration can't be null."); + } + if (null == _jobName) + { + throw new NullReferenceException("JobName can't be null."); + } + return new IMRUJobDefinition(_configuration, _jobName); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs new file mode 100644 index 0000000..9c6479c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMapFunction.cs @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.IMRU.API +{ + /// <summary> + /// Interface to be implemented by Map functions in IMRU. + /// </summary> + /// <remarks> + /// Objects of this type are stateful in the sense that the Map function will be called many times, and state can be + /// kept in instance variables in between these invocations. + /// The data the map is performed on is assumed to be passed in as a constructor parameter. + /// </remarks> + /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> + /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> + public interface IMapFunction<TMapInput, TMapOutput> + { + /// <summary> + /// Computes new output based on the given side information and data. + /// </summary> + /// <param name="mapInput">Can't be null.</param> + /// <returns>The output of this map round. Can't be null.</returns> + TMapOutput Map(TMapInput mapInput); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs new file mode 100644 index 0000000..bae1448 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IUpdateFunction.cs @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace Org.Apache.REEF.IMRU.API +{ + /// <summary> + /// The interface for the Update function of IMRU + /// </summary> + /// <remarks> + /// Objects implementing this interface are stateful. + /// </remarks> + /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> + /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> + /// <typeparam name="TResult">The return type of the computation.</typeparam> + public interface IUpdateFunction<TMapInput, TMapOutput, TResult> + { + /// <summary> + /// The Update task for IMRU. + /// </summary> + /// <param name="input">The input produced by the IMapFunction instances after it was passed through the IReduceFunction.</param> + /// <returns></returns> + UpdateResult<TMapInput, TResult> Update(TMapOutput input); + + /// <summary> + /// Called at the beginning of the computation. + /// </summary> + /// <returns></returns> + UpdateResult<TMapInput, TResult> Initialize(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapInputCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapInputCodec.cs b/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapInputCodec.cs new file mode 100644 index 0000000..fc21e79 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapInputCodec.cs @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.StreamingCodec; + +namespace Org.Apache.REEF.IMRU.API.Parameters +{ + /// <summary> + /// The codec to be used for the map input. + /// </summary> + /// <typeparam name="TMapInput"></typeparam> + [NamedParameter("The codec to be used for the map input.")] + public sealed class MapInputCodec<TMapInput> : Name<IStreamingCodec<TMapInput>> + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapOutputCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapOutputCodec.cs b/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapOutputCodec.cs new file mode 100644 index 0000000..095415d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/MapOutputCodec.cs @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.StreamingCodec; + +namespace Org.Apache.REEF.IMRU.API.Parameters +{ + /// <summary> + /// The codec to be used for the map output. + /// </summary> + /// <typeparam name="TMapOutput"></typeparam> + [NamedParameter("The codec to be used for the map output.")] + public sealed class MapOutputCodec<TMapOutput> : Name<IStreamingCodec<TMapOutput>> + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/ResultCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/ResultCodec.cs b/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/ResultCodec.cs new file mode 100644 index 0000000..d6a35a5 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/Parameters/ResultCodec.cs @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.StreamingCodec; + +namespace Org.Apache.REEF.IMRU.API.Parameters +{ + /// <summary> + /// The codec to be used for the result. + /// </summary> + /// <typeparam name="TResult"></typeparam> + [NamedParameter("The codec to be used for the result.")] + public class ResultCodec<TResult> : Name<IStreamingCodec<TResult>> + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/API/UpdateResult.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/UpdateResult.cs b/lang/cs/Org.Apache.REEF.IMRU/API/UpdateResult.cs new file mode 100644 index 0000000..f1a97f4 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/API/UpdateResult.cs @@ -0,0 +1,87 @@ +namespace Org.Apache.REEF.IMRU.API +{ + /// <summary> + /// Represents the output of an IUpdateFunction. + /// </summary> + /// <seealso cref="IUpdateFunction{TMapInput,TMapOutput,TResult}" /> + /// <typeparam name="TMapInput"></typeparam> + /// <typeparam name="TResult"></typeparam> + public sealed class UpdateResult<TMapInput, TResult> + { + private readonly bool _done; + private readonly bool _hasMapInput; + private readonly bool _hasResult; + private readonly TMapInput _mapInput; + private readonly TResult _result; + + private UpdateResult(bool done, bool hasMapInput, bool hasResult, TResult result, TMapInput mapInput) + { + _result = result; + _mapInput = mapInput; + _hasMapInput = hasMapInput; + _hasResult = hasResult; + _done = done; + } + + internal bool IsDone + { + get { return _done; } + } + + internal bool IsNotDone + { + get { return !_done; } + } + + internal bool HasMapInput + { + get { return _hasMapInput; } + } + + internal bool HasResult + { + get { return _hasResult; } + } + + internal TMapInput MapInput + { + get { return _mapInput; } + } + + internal TResult Result + { + get { return _result; } + } + + /// <summary> + /// Indicate that the IMRU job is done. + /// </summary> + /// <param name="result">The result of the job.</param> + /// <returns></returns> + public static UpdateResult<TMapInput, TResult> Done(TResult result) + { + return new UpdateResult<TMapInput, TResult>(true, false, true, result, default(TMapInput)); + } + + /// <summary> + /// Indicate that another round of computation is needed. + /// </summary> + /// <param name="mapInput">The input to the IMapFunction.Map() method.</param> + /// <returns></returns> + public static UpdateResult<TMapInput, TResult> AnotherRound(TMapInput mapInput) + { + return new UpdateResult<TMapInput, TResult>(false, true, false, default(TResult), mapInput); + } + + /// <summary> + /// Indicate another round and produce some intermediate results. + /// </summary> + /// <param name="mapInput">The input to the IMapFunction.Map() method.</param> + /// <param name="intermediateResult">The intermediate results.</param> + /// <returns></returns> + public static UpdateResult<TMapInput, TResult> AnotherRound(TMapInput mapInput, TResult intermediateResult) + { + return new UpdateResult<TMapInput, TResult>(false, true, true, intermediateResult, mapInput); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs new file mode 100644 index 0000000..366881b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IdentityMapFunction.cs @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IMRU.Examples.MapperCount +{ + /// <summary> + /// A MapFunction that returns its input. + /// </summary> + public sealed class IdentityMapFunction : IMapFunction<int, int> + { + [Inject] + private IdentityMapFunction() + { + } + + public int Map(int mapInput) + { + return mapInput; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs new file mode 100644 index 0000000..03d9605 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/IntSumReduceFunction.cs @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Linq; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IMRU.Examples.MapperCount +{ + /// <summary> + /// A reduce function that sums integers. + /// </summary> + public sealed class IntSumReduceFunction : IReduceFunction<int> + { + [Inject] + private IntSumReduceFunction() + { + } + + public int Reduce(IEnumerable<int> elements) + { + return elements.Aggregate((x, y) => x + y); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs new file mode 100644 index 0000000..9b5b247 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCount.cs @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Linq; +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs; + +namespace Org.Apache.REEF.IMRU.Examples.MapperCount +{ + /// <summary> + /// A simple IMRU program that counts the number of map function instances launched. + /// </summary> + public sealed class MapperCount + { + private readonly IIMRUClient<int, int, int> _imruClient; + + [Inject] + private MapperCount(IIMRUClient<int, int, int> imruClient) + { + _imruClient = imruClient; + } + + /// <summary> + /// </summary> + /// <returns>The number of MapFunction instances that are part of the job.</returns> + public int Run() + { + var results = _imruClient.Submit( + new IMRUJobDefinitionBuilder() + .SetConfiguration( + IMRUConfiguration<int, int, int>.ConfigurationModule + .Set(IMRUConfiguration<int, int, int>.MapFunction, GenericType<IdentityMapFunction>.Class) + .Set(IMRUConfiguration<int, int, int>.ReduceFunction, + GenericType<IntSumReduceFunction>.Class) + .Set(IMRUConfiguration<int, int, int>.UpdateFunction, + GenericType<MapperCountUpdateFunction>.Class) + .Set(IMRUConfiguration<int, int, int>.MapInputCodec, + GenericType<IntStreamingCodec>.Class) + .Set(IMRUConfiguration<int, int, int>.MapOutputCodec, + GenericType<IntStreamingCodec>.Class) + .Set(IMRUConfiguration<int, int, int>.ResultCodec, GenericType<IntStreamingCodec>.Class) + .Build()) + .SetJobName("MapperCount") + .Build() + ); + return results.First(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs new file mode 100644 index 0000000..f69605f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/Examples/MapperCount/MapperCountUpdateFunction.cs @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IMRU.Examples.MapperCount +{ + /// <summary> + /// The Update function for the mapper counting job. + /// </summary> + /// <remarks> + /// Upon Initialize(), this sends `1` to all Map Function instances. Each of them returns `1`, which shows up as the + /// parameter passed into `Update`. At that point, we can immediately terminate. + /// </remarks> + public sealed class MapperCountUpdateFunction : IUpdateFunction<int, int, int> + { + [Inject] + private MapperCountUpdateFunction() + { + } + + public UpdateResult<int, int> Update(int input) + { + return UpdateResult<int, int>.Done(input); + } + + public UpdateResult<int, int> Initialize() + { + return UpdateResult<int, int>.AnotherRound(1); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs b/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs new file mode 100644 index 0000000..8ff36ab --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/IMRURunner.cs @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.Network.Group.Operators; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IMRU.InProcess +{ + /// <summary> + /// Simple, single-threaded executor for IMRU Jobs. + /// </summary> + /// <typeparam name="TMapInput"></typeparam> + /// <typeparam name="TMapOutput"></typeparam> + /// <typeparam name="TResult"></typeparam> + internal sealed class IMRURunner<TMapInput, TMapOutput, TResult> + { + private readonly ISet<IMapFunction<TMapInput, TMapOutput>> _mapfunctions; + private readonly IReduceFunction<TMapOutput> _reduceTask; + private readonly IUpdateFunction<TMapInput, TMapOutput, TResult> _updateTask; + + [Inject] + private IMRURunner(MapFunctions<TMapInput, TMapOutput> mapfunctions, + IReduceFunction<TMapOutput> reduceTask, + IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask) + { + _mapfunctions = mapfunctions.Mappers; + _reduceTask = reduceTask; + _updateTask = updateTask; + } + + internal IList<TResult> Run() + { + var results = new List<TResult>(); + var updateResult = _updateTask.Initialize(); + + while (updateResult.IsNotDone) + { + if (updateResult.HasResult) + { + results.Add(updateResult.Result); + } + Debug.Assert(updateResult.HasMapInput); + var mapinput = updateResult.MapInput; + var mapOutputs = _mapfunctions.Select(x => x.Map(mapinput)); + var mapOutput = _reduceTask.Reduce(mapOutputs); + updateResult = _updateTask.Update(mapOutput); + } + if (updateResult.HasResult) + { + results.Add(updateResult.Result); + } + return results; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs new file mode 100644 index 0000000..4207bd4 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using System.Diagnostics; +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.InProcess.Parameters; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; + +namespace Org.Apache.REEF.IMRU.InProcess +{ + /// <summary> + /// Implements the IMRU client API for in-process execution + /// </summary> + /// <remarks> + /// This client assumes that all given Configurations can be merged in a conflict-free way. + /// </remarks> + /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> + /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> + /// <typeparam name="TResult">The return type of the computation.</typeparam> + public class InProcessIMRUClient<TMapInput, TMapOutput, TResult> : IIMRUClient<TMapInput, TMapOutput, TResult> + { + private readonly int _numberOfMappers; + + /// <summary> + /// Use Tang to instantiate this. + /// </summary> + /// <param name="numberOfMappers">The number of mappers to instantiate</param> + [Inject] + private InProcessIMRUClient([Parameter(typeof(NumberOfMappers))] int numberOfMappers) + { + Debug.Assert(numberOfMappers > 0); + _numberOfMappers = numberOfMappers; + } + + public IEnumerable<TResult> Submit(IMRUJobDefinition jobDefinition) + { + var injector = TangFactory.GetTang().NewInjector(jobDefinition.Configuration); + + injector.BindVolatileInstance(GenericType<MapFunctions<TMapInput, TMapOutput>>.Class, + MakeMapFunctions(injector)); + + var runner = injector.GetInstance<IMRURunner<TMapInput, TMapOutput, TResult>>(); + return runner.Run(); + } + + private MapFunctions<TMapInput, TMapOutput> MakeMapFunctions(IInjector injector) + { + ISet<IMapFunction<TMapInput, TMapOutput>> mappers = new HashSet<IMapFunction<TMapInput, TMapOutput>>(); + for (var i = 0; i < _numberOfMappers; ++i) + { + mappers.Add(injector.ForkInjector().GetInstance<IMapFunction<TMapInput, TMapOutput>>()); + } + return new MapFunctions<TMapInput, TMapOutput>(mappers); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs new file mode 100644 index 0000000..75346ea --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUConfiguration.cs @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.InProcess.Parameters; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; + +namespace Org.Apache.REEF.IMRU.InProcess +{ + /// <summary> + /// Configuration module for the in-process IMRU. + /// </summary> + /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> + /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> + /// <typeparam name="TResult">The return type of the computation.</typeparam> + public sealed class InProcessIMRUConfiguration<TMapInput, TMapOutput, TResult> : ConfigurationModuleBuilder + { + /// <summary> + /// The number of Mappers to instantiate. + /// </summary> + public static readonly OptionalParameter<int> NumberOfMappers = new OptionalParameter<int>(); + + public static ConfigurationModule ConfigurationModule = + new InProcessIMRUConfiguration<TMapInput, TMapOutput, TResult>() + .BindImplementation(GenericType<IIMRUClient<TMapInput, TMapOutput, TResult>>.Class, + GenericType<InProcessIMRUClient<TMapInput, TMapOutput, TResult>>.Class) + .BindNamedParameter(GenericType<NumberOfMappers>.Class, NumberOfMappers) + .Build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/InProcess/MapFunctions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/MapFunctions.cs b/lang/cs/Org.Apache.REEF.IMRU/InProcess/MapFunctions.cs new file mode 100644 index 0000000..f54efb3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/MapFunctions.cs @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Collections.Generic; +using Org.Apache.REEF.IMRU.API; + +namespace Org.Apache.REEF.IMRU.InProcess +{ + /// <summary> + /// Helper class to hold a set of mappers. + /// </summary> + /// <typeparam name="TMapInput"></typeparam> + /// <typeparam name="TMapOutput"></typeparam> + // This is needed as Tang doesn't support BindVolatile into a Set. + internal sealed class MapFunctions<TMapInput, TMapOutput> + { + private readonly ISet<IMapFunction<TMapInput, TMapOutput>> _mappers; + + internal MapFunctions(ISet<IMapFunction<TMapInput, TMapOutput>> mappers) + { + _mappers = mappers; + } + + internal ISet<IMapFunction<TMapInput, TMapOutput>> Mappers + { + get { return _mappers; } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/InProcess/Parameters/NumberOfMappers.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/Parameters/NumberOfMappers.cs b/lang/cs/Org.Apache.REEF.IMRU/InProcess/Parameters/NumberOfMappers.cs new file mode 100644 index 0000000..87a649e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/Parameters/NumberOfMappers.cs @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IMRU.InProcess.Parameters +{ + /// <summary> + /// The number of mappers to instantiate. + /// </summary> + [NamedParameter("The number of mappers to instantiate.", defaultValue: "4")] + public sealed class NumberOfMappers : Name<int> + { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index 40c96d3..bdafd6b 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -41,6 +41,25 @@ under the License. <Reference Include="System.Xml" /> </ItemGroup> <ItemGroup> + <Compile Include="API\IMRUConfiguration.cs" /> + <Compile Include="API\IIMRUClient.cs" /> + <Compile Include="API\IMapFunction.cs" /> + <Compile Include="API\IMRUJobDefinition.cs" /> + <Compile Include="API\IMRUJobDefinitionBuilder.cs" /> + <Compile Include="API\IUpdateFunction.cs" /> + <Compile Include="API\Parameters\MapInputCodec.cs" /> + <Compile Include="API\Parameters\MapOutputCodec.cs" /> + <Compile Include="API\Parameters\ResultCodec.cs" /> + <Compile Include="API\UpdateResult.cs" /> + <Compile Include="Examples\MapperCount\MapperCount.cs" /> + <Compile Include="Examples\MapperCount\IdentityMapFunction.cs" /> + <Compile Include="Examples\MapperCount\IntSumReduceFunction.cs" /> + <Compile Include="Examples\MapperCount\MapperCountUpdateFunction.cs" /> + <Compile Include="InProcess\IMRURunner.cs" /> + <Compile Include="InProcess\InProcessIMRUClient.cs" /> + <Compile Include="InProcess\InProcessIMRUConfiguration.cs" /> + <Compile Include="InProcess\MapFunctions.cs" /> + <Compile Include="InProcess\Parameters\NumberOfMappers.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> </ItemGroup> <ItemGroup> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs index deeb123..0b49374 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/Properties/AssemblyInfo.cs @@ -17,6 +17,7 @@ * under the License. */ using System.Reflection; +using System.Runtime.CompilerServices; using System.Runtime.InteropServices; [assembly: AssemblyTitle("Org.Apache.REEF.IMRU")] @@ -30,4 +31,6 @@ using System.Runtime.InteropServices; [assembly: ComVisible(false)] [assembly: Guid("138891df-d331-4a5e-8514-775611c06f6c")] [assembly: AssemblyVersion("0.12.0.0")] -[assembly: AssemblyFileVersion("0.12.0.0")] \ No newline at end of file +[assembly: AssemblyFileVersion("0.12.0.0")] +// Allow the tests project access to `internal` APIs +[assembly: InternalsVisibleTo("Org.Apache.REEF.IMRU.Tests")] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/5b359ca9/lang/cs/Org.Apache.REEF.sln ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.sln b/lang/cs/Org.Apache.REEF.sln index 68c282f..26ff12b 100644 Binary files a/lang/cs/Org.Apache.REEF.sln and b/lang/cs/Org.Apache.REEF.sln differ
