Repository: incubator-reef
Updated Branches:
  refs/heads/master 9152ea788 -> 63e0c074c


[REEF-648] Add evaluator memory option to IMRU and its examples.

This addressed the issue by

  * giving user an option to give the memory as input in Run.cs
  * including the memory field in IMRUJobDefinition and
    IMRUJobDefinitionBuilder
  * including the memory bindings in driver configuration
  * fixing a small bug in IMRUClient configuration for BroadcastReduce
    in Run.cs

JIRA:
  [REEF-648](https://issues.apache.org/jira/browse/REEF-648)

Pull Request:
  This closes #414


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/63e0c074
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/63e0c074
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/63e0c074

Branch: refs/heads/master
Commit: 63e0c074c5df4453e1b7c5ecf10c6cd7bb0ccd80
Parents: 9152ea7
Author: Dhruv <dhruv.maha...@gmail.com>
Authored: Tue Aug 25 19:45:04 2015 -0700
Committer: Markus Weimer <wei...@apache.org>
Committed: Wed Aug 26 11:46:23 2015 -0700

----------------------------------------------------------------------
 .../PipelinedBroadcastAndReduce.cs              |  4 ++-
 lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs    | 18 ++++++++++---
 .../API/IMRUJobDefinition.cs                    | 23 ++++++++++++++++
 .../API/IMRUJobDefinitionBuilder.cs             | 28 ++++++++++++++++++++
 .../OnREEF/Client/REEFIMRUClient.cs             |  4 +++
 5 files changed, 73 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
index 896a0d0..8f1cbc7 100644
--- 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
@@ -43,7 +43,7 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
         /// <summary>
         /// Runs the actual broadcast and reduce job
         /// </summary>
-        public void Run(int numberofMappers, int chunkSize, int numIterations, 
int dim)
+        public void Run(int numberofMappers, int chunkSize, int numIterations, 
int dim, int mapperMemory, int updateTaskMemory)
         {
             var updateFunctionConfig =
                 
TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[], 
int[], int[]>.ConfigurationModule
@@ -99,6 +99,8 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
                             numberofMappers.ToString()).Build())
                     .SetJobName("BroadcastReduce")
                     .SetNumberOfMappers(numberofMappers)
+                    .SetMapperMemory(mapperMemory)
+                    .SetUpdateTaskMemory(updateTaskMemory)
                     .Build());
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
index c0a6a3b..b1554d5 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
@@ -60,6 +60,8 @@ namespace Org.Apache.REEF.IMRU.Examples
             int chunkSize = 2;
             int dims = 10;
             int iterations = 10;
+            int mapperMemory = 512;
+            int updateTaskMemory = 512;
 
             if (args.Length > 0)
             {
@@ -73,7 +75,17 @@ namespace Org.Apache.REEF.IMRU.Examples
 
             if (args.Length > 2)
             {
-                iterations = Convert.ToInt32(args[2]);
+                mapperMemory = Convert.ToInt32(args[2]);
+            }
+
+            if (args.Length > 3)
+            {
+                updateTaskMemory = Convert.ToInt32(args[3]);
+            }
+
+            if (args.Length > 4)
+            {
+                iterations = Convert.ToInt32(args[4]);
             }
 
             IInjector injector;
@@ -87,10 +99,10 @@ namespace Org.Apache.REEF.IMRU.Examples
             else
             {
                 injector = TangFactory.GetTang()
-                    .NewInjector(OnREEFIMRURunTimeConfiguration<int, int, 
int>.GetYarnIMRUConfiguration(), tcpPortConfig);
+                    .NewInjector(OnREEFIMRURunTimeConfiguration<int[], int[], 
int[]>.GetYarnIMRUConfiguration(), tcpPortConfig);
             }
             var broadcastReduceExample = 
injector.GetInstance<PipelinedBroadcastAndReduce>();
-            broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, 
dims);
+            broadcastReduceExample.Run(numNodes - 1, chunkSize, iterations, 
dims, mapperMemory, updateTaskMemory);
         }
 
         private static void Main(string[] args)

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
index 6a955c2..ed8c211 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
@@ -37,6 +37,8 @@ namespace Org.Apache.REEF.IMRU.API
         private readonly IConfiguration 
_mapInputPipelineDataConverterConfiguration;
         private readonly IConfiguration _partitionedDatasetConfiguration;
         private readonly int _numberOfMappers;
+        private readonly int _memoryPerMapper;
+        private readonly int _updateTaskMemory;
 
         /// <summary>
         /// Constructor
@@ -54,6 +56,7 @@ namespace Org.Apache.REEF.IMRU.API
         /// <param name="partitionedDatasetConfiguration">Configuration of 
partitioned 
         /// dataset</param>
         /// <param name="numberOfMappers">Number of mappers</param>
+        /// <param name="memoryPerMapper">Per Mapper memory.</param>
         /// <param name="jobName">Job name</param>
         internal IMRUJobDefinition(
             IConfiguration mapFunctionConfiguration,
@@ -65,6 +68,8 @@ namespace Org.Apache.REEF.IMRU.API
             IConfiguration mapInputPipelineDataConverterConfiguration,
             IConfiguration partitionedDatasetConfiguration,
             int numberOfMappers,
+            int memoryPerMapper,
+            int updateTaskMemory,
             string jobName)
         {
             _mapFunctionConfiguration = mapFunctionConfiguration;
@@ -77,6 +82,8 @@ namespace Org.Apache.REEF.IMRU.API
             _partitionedDatasetConfiguration = partitionedDatasetConfiguration;
             _numberOfMappers = numberOfMappers;
             _jobName = jobName;
+            _memoryPerMapper = memoryPerMapper;
+            _updateTaskMemory = updateTaskMemory;
         }
 
         /// <summary>
@@ -161,5 +168,21 @@ namespace Org.Apache.REEF.IMRU.API
         internal int NumberOfMappers {
             get { return _numberOfMappers; }
         }
+
+        /// <summary>
+        /// Memory for each mapper in MB
+        /// </summary>
+        internal int MapperMemory
+        {
+            get { return _memoryPerMapper; }
+        }
+
+        /// <summary>
+        /// Memory for update task in MB
+        /// </summary>
+        internal int UpdateTaskMemory
+        {
+            get { return _updateTaskMemory; }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
index 7f54459..730fb54 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
@@ -36,6 +36,8 @@ namespace Org.Apache.REEF.IMRU.API
 
         private string _jobName;
         private int _numberOfMappers;
+        private int _memoryPerMapper;
+        private int _updateTaskMemory;
         private IConfiguration _mapFunctionConfiguration;
         private IConfiguration _mapInputCodecConfiguration;
         private IConfiguration _updateFunctionCodecsConfiguration;
@@ -56,6 +58,8 @@ namespace Org.Apache.REEF.IMRU.API
             _mapInputPipelineDataConverterConfiguration = EmptyConfiguration;
             _mapOutputPipelineDataConverterConfiguration = EmptyConfiguration;
             _partitionedDatasetConfiguration = EmptyConfiguration;
+            _memoryPerMapper = 512;
+            _updateTaskMemory = 512;
         }
 
         /// <summary>
@@ -177,6 +181,28 @@ namespace Org.Apache.REEF.IMRU.API
         }
 
         /// <summary>
+        /// Sets mapper memory
+        /// </summary>
+        /// <param name="memory">memory in MB</param>
+        /// <returns></returns>
+        public IMRUJobDefinitionBuilder SetMapperMemory(int memory)
+        {
+            _memoryPerMapper = memory;
+            return this;
+        }
+
+        /// <summary>
+        /// Set update task memory
+        /// </summary>
+        /// <param name="memory">memory in MB</param>
+        /// <returns></returns>
+        public IMRUJobDefinitionBuilder SetUpdateTaskMemory(int memory)
+        {
+            _updateTaskMemory = memory;
+            return this;
+        }
+
+        /// <summary>
         /// Instantiate the IMRUJobDefinition.
         /// </summary>
         /// <returns>The IMRUJobDefintion configured.</returns>
@@ -225,6 +251,8 @@ namespace Org.Apache.REEF.IMRU.API
                 _mapInputPipelineDataConverterConfiguration,
                 _partitionedDatasetConfiguration,
                 _numberOfMappers,
+                _memoryPerMapper,
+                _updateTaskMemory,
                 _jobName);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/63e0c074/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index 99f5313..9466086 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -102,6 +102,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
                     
_configurationSerializer.ToString(jobDefinition.MapOutputPipelineDataConverterConfiguration))
                 .BindNamedParameter(typeof (SerializedReduceConfiguration),
                     
_configurationSerializer.ToString(jobDefinition.ReduceFunctionConfiguration))
+                .BindNamedParameter(typeof (MemoryPerMapper),
+                    
jobDefinition.MapperMemory.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof (MemoryForUpdateTask),
+                    
jobDefinition.UpdateTaskMemory.ToString(CultureInfo.InvariantCulture))
                 .Build();
 
             // The JobSubmission contains the Driver configuration as well as 
the files needed on the Driver.

Reply via email to