Repository: incubator-reef Updated Branches: refs/heads/master 9152ea788 -> 63e0c074c
[REEF-648] Add evaluator memory option to IMRU and its examples. This addressed the issue by * giving user an option to give the memory as input in Run.cs * including the memory field in IMRUJobDefinition and IMRUJobDefinitionBuilder * including the memory bindings in driver configuration * fixing a small bug in IMRUClient configuration for BroadcastReduce in Run.cs JIRA: [REEF-648](https://issues.apache.org/jira/browse/REEF-648) Pull Request: This closes #414 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/63e0c074 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/63e0c074 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/63e0c074 Branch: refs/heads/master Commit: 63e0c074c5df4453e1b7c5ecf10c6cd7bb0ccd80 Parents: 9152ea7 Author: Dhruv <dhruv.maha...@gmail.com> Authored: Tue Aug 25 19:45:04 2015 -0700 Committer: Markus Weimer <wei...@apache.org> Committed: Wed Aug 26 11:46:23 2015 -0700 ---------------------------------------------------------------------- .../PipelinedBroadcastAndReduce.cs | 4 ++- lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs | 18 ++++++++++--- .../API/IMRUJobDefinition.cs | 23 ++++++++++++++++ .../API/IMRUJobDefinitionBuilder.cs | 28 ++++++++++++++++++++ .../OnREEF/Client/REEFIMRUClient.cs | 4 +++ 5 files changed, 73 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs index 896a0d0..8f1cbc7 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs @@ -43,7 +43,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce /// <summary> /// Runs the actual broadcast and reduce job /// </summary> - public void Run(int numberofMappers, int chunkSize, int numIterations, int dim) + public void Run(int numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, int updateTaskMemory) { var updateFunctionConfig = TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[], int[], int[]>.ConfigurationModule @@ -99,6 +99,8 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce numberofMappers.ToString()).Build()) .SetJobName("BroadcastReduce") .SetNumberOfMappers(numberofMappers) + .SetMapperMemory(mapperMemory) + .SetUpdateTaskMemory(updateTaskMemory) .Build()); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs index c0a6a3b..b1554d5 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs @@ -60,6 +60,8 @@ namespace Org.Apache.REEF.IMRU.Examples int chunkSize = 2; int dims = 10; int iterations = 10; + int mapperMemory = 512; + int updateTaskMemory = 512; if (args.Length > 0) { @@ -73,7 +75,17 @@ namespace Org.Apache.REEF.IMRU.Examples if (args.Length > 2) { - iterations = Convert.ToInt32(args[2]); + mapperMemory = Convert.ToInt32(args[2]); + } + + if (args.Length > 3) + { + updateTaskMemory = Convert.ToInt32(args[3]); + } + + if (args.Length > 4) + { + iterations = Convert.ToInt32(args[4]); } IInjector injector; @@ -87,10 +99,10 @@ namespace Org.Apache.REEF.IMRU.Examples else { injector = TangFactory.GetTang() - .NewInjector(OnREEFIMRURunTimeConfiguration<int, int, int>.GetYarnIMRUConfiguration(), tcpPortConfig); + .NewInjector(OnREEFIMRURunTimeConfiguration<int[], int[], int[]>.GetYarnIMRUConfiguration(), tcpPortConfig); } var broadcastReduceExample = injector.GetInstance<PipelinedBroadcastAndReduce>(); - broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, dims); + broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, dims, mapperMemory, updateTaskMemory); } private static void Main(string[] args) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs index 6a955c2..ed8c211 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs @@ -37,6 +37,8 @@ namespace Org.Apache.REEF.IMRU.API private readonly IConfiguration _mapInputPipelineDataConverterConfiguration; private readonly IConfiguration _partitionedDatasetConfiguration; private readonly int _numberOfMappers; + private readonly int _memoryPerMapper; + private readonly int _updateTaskMemory; /// <summary> /// Constructor @@ -54,6 +56,7 @@ namespace Org.Apache.REEF.IMRU.API /// <param name="partitionedDatasetConfiguration">Configuration of partitioned /// dataset</param> /// <param name="numberOfMappers">Number of mappers</param> + /// <param name="memoryPerMapper">Per Mapper memory.</param> /// <param name="jobName">Job name</param> internal IMRUJobDefinition( IConfiguration mapFunctionConfiguration, @@ -65,6 +68,8 @@ namespace Org.Apache.REEF.IMRU.API IConfiguration mapInputPipelineDataConverterConfiguration, IConfiguration partitionedDatasetConfiguration, int numberOfMappers, + int memoryPerMapper, + int updateTaskMemory, string jobName) { _mapFunctionConfiguration = mapFunctionConfiguration; @@ -77,6 +82,8 @@ namespace Org.Apache.REEF.IMRU.API _partitionedDatasetConfiguration = partitionedDatasetConfiguration; _numberOfMappers = numberOfMappers; _jobName = jobName; + _memoryPerMapper = memoryPerMapper; + _updateTaskMemory = updateTaskMemory; } /// <summary> @@ -161,5 +168,21 @@ namespace Org.Apache.REEF.IMRU.API internal int NumberOfMappers { get { return _numberOfMappers; } } + + /// <summary> + /// Memory for each mapper in MB + /// </summary> + internal int MapperMemory + { + get { return _memoryPerMapper; } + } + + /// <summary> + /// Memory for update task in MB + /// </summary> + internal int UpdateTaskMemory + { + get { return _updateTaskMemory; } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs index 7f54459..730fb54 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs @@ -36,6 +36,8 @@ namespace Org.Apache.REEF.IMRU.API private string _jobName; private int _numberOfMappers; + private int _memoryPerMapper; + private int _updateTaskMemory; private IConfiguration _mapFunctionConfiguration; private IConfiguration _mapInputCodecConfiguration; private IConfiguration _updateFunctionCodecsConfiguration; @@ -56,6 +58,8 @@ namespace Org.Apache.REEF.IMRU.API _mapInputPipelineDataConverterConfiguration = EmptyConfiguration; _mapOutputPipelineDataConverterConfiguration = EmptyConfiguration; _partitionedDatasetConfiguration = EmptyConfiguration; + _memoryPerMapper = 512; + _updateTaskMemory = 512; } /// <summary> @@ -177,6 +181,28 @@ namespace Org.Apache.REEF.IMRU.API } /// <summary> + /// Sets mapper memory + /// </summary> + /// <param name="memory">memory in MB</param> + /// <returns></returns> + public IMRUJobDefinitionBuilder SetMapperMemory(int memory) + { + _memoryPerMapper = memory; + return this; + } + + /// <summary> + /// Set update task memory + /// </summary> + /// <param name="memory">memory in MB</param> + /// <returns></returns> + public IMRUJobDefinitionBuilder SetUpdateTaskMemory(int memory) + { + _updateTaskMemory = memory; + return this; + } + + /// <summary> /// Instantiate the IMRUJobDefinition. /// </summary> /// <returns>The IMRUJobDefintion configured.</returns> @@ -225,6 +251,8 @@ namespace Org.Apache.REEF.IMRU.API _mapInputPipelineDataConverterConfiguration, _partitionedDatasetConfiguration, _numberOfMappers, + _memoryPerMapper, + _updateTaskMemory, _jobName); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs index 99f5313..9466086 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs @@ -102,6 +102,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client _configurationSerializer.ToString(jobDefinition.MapOutputPipelineDataConverterConfiguration)) .BindNamedParameter(typeof (SerializedReduceConfiguration), _configurationSerializer.ToString(jobDefinition.ReduceFunctionConfiguration)) + .BindNamedParameter(typeof (MemoryPerMapper), + jobDefinition.MapperMemory.ToString(CultureInfo.InvariantCulture)) + .BindNamedParameter(typeof (MemoryForUpdateTask), + jobDefinition.UpdateTaskMemory.ToString(CultureInfo.InvariantCulture)) .Build(); // The JobSubmission contains the Driver configuration as well as the files needed on the Driver.