Repository: reef
Updated Branches:
  refs/heads/master cf8275e39 -> 6a1b710b0


[REEF-1224] IMRU Fault Tolerance - Separate Data downloading from Task injection

This addressed the issue by
  * forcing IDataPartition to be instantiated at root context and service level 
in mappers
  * instantiating the Network service at task level to make fault tolerance 
easier

JIRA:
  [REEF-1224](https://issues.apache.org/jira/browse/REEF-1224)

Pull Request:
  This closes #952


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/6a1b710b
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/6a1b710b
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/6a1b710b

Branch: refs/heads/master
Commit: 6a1b710b08df0906f48da44d1374522e9dc64865
Parents: cf8275e
Author: Dhruv <[email protected]>
Authored: Tue Mar 22 14:05:08 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Apr 22 10:45:27 2016 -0700

----------------------------------------------------------------------
 .../MapperCount/IdentityMapFunction.cs          |   4 +-
 .../MapperCount/MapperCount.cs                  |   3 +-
 .../PipelinedBroadcastAndReduce.cs              |   3 +-
 lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs |   5 +-
 .../InProcess/InProcessIMRUClient.cs            |   3 +-
 .../OnREEF/Client/REEFIMRUClient.cs             |  19 +-
 .../OnREEF/Driver/DataLoadingContext.cs         |  71 ++++
 .../OnREEF/Driver/IMRUConstants.cs              |   1 +
 .../OnREEF/Driver/IMRUDriver.cs                 | 387 ++++++++++++-------
 .../OnREEF/Driver/IMRUSystemException.cs        |  57 +++
 ...NumberOfEvalutorFailuresExceededException.cs |  47 +++
 .../ServiceAndContextConfigurationProvider.cs   | 298 +++++++++-----
 .../Org.Apache.REEF.IMRU.csproj                 |   3 +
 13 files changed, 653 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/IdentityMapFunction.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/IdentityMapFunction.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/IdentityMapFunction.cs
index 35228a2..2eca3fd 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/IdentityMapFunction.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/IdentityMapFunction.cs
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.IO;
 using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IO.PartitionedData;
 using Org.Apache.REEF.Tang.Annotations;
 
 namespace Org.Apache.REEF.IMRU.Examples.MapperCount
@@ -26,7 +28,7 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount
     public sealed class IdentityMapFunction : IMapFunction<int, int>
     {
         [Inject]
-        private IdentityMapFunction()
+        private IdentityMapFunction(IInputPartition<Stream> dataPartition)
         {
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs
index e00370b..a559ff8 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.IO;
 using System.Linq;
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IMRU.OnREEF.Parameters;
@@ -49,7 +50,7 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount
         /// <returns>The number of MapFunction instances that are part of the 
job.</returns>
         public int Run(int numberofMappers, string outputFile, IConfiguration 
fileSystemConfig)
         {
-            var results = _imruClient.Submit<int, int, int>(
+            var results = _imruClient.Submit<int, int, int, Stream>(
                 new IMRUJobDefinitionBuilder()
                     .SetMapFunctionConfiguration(IMRUMapConfiguration<int, 
int>.ConfigurationModule
                         .Set(IMRUMapConfiguration<int, int>.MapFunction, 
GenericType<IdentityMapFunction>.Class)

http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
index b438fbf..de1598c 100644
--- 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System.Globalization;
+using System.IO;
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IO.PartitionedData.Random;
 using Org.Apache.REEF.Tang.Annotations;
@@ -73,7 +74,7 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
                         chunkSize.ToString(CultureInfo.InvariantCulture))
                     .Build();
 
-            var results = _imruClient.Submit<int[], int[], int[]>(
+            var results = _imruClient.Submit<int[], int[], int[], Stream>(
                 new IMRUJobDefinitionBuilder()
                     .SetMapFunctionConfiguration(IMRUMapConfiguration<int[], 
int[]>.ConfigurationModule
                         .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,

http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs
index 2fa6b3c..bf92820 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs
@@ -22,16 +22,17 @@ using Org.Apache.REEF.Utilities.Attributes;
 namespace Org.Apache.REEF.IMRU.API
 {
     public interface IIMRUClient
-    {        
+    {
         /// <summary>
         /// Submit the given job for execution.
         /// </summary>
         /// <typeparam name="TMapInput">The type of the side information 
provided to the Map function</typeparam>
         /// <typeparam name="TMapOutput">The return type of the Map 
function</typeparam>
         /// <typeparam name="TResult">The return type of the 
computation.</typeparam>
+        /// <typeparam name="TPartitionType">Type of data partition (Generic 
type in IInputPartition)</typeparam>
         /// <param name="jobDefinition">IMRU job definition</param>
         /// <returns>Result of IMRU</returns>
-        IEnumerable<TResult> Submit<TMapInput, TMapOutput, 
TResult>(IMRUJobDefinition jobDefinition);
+        IEnumerable<TResult> Submit<TMapInput, TMapOutput, TResult, 
TPartitionType>(IMRUJobDefinition jobDefinition);
 
         /// <summary>
         /// DriverHttpEndPoint returned by IReefClient after job submission

http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs 
b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
index d79e4c4..a2acf5a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs
@@ -64,9 +64,10 @@ namespace Org.Apache.REEF.IMRU.InProcess
         /// <typeparam name="TMapInput">The type of the side information 
provided to the Map function</typeparam>
         /// <typeparam name="TMapOutput">The return type of the Map 
function</typeparam>
         /// <typeparam name="TResult">The return type of the 
computation.</typeparam>
+        /// <typeparam name="TPartitionType">Type of data partition (Generic 
type in IInputPartition)</typeparam>
         /// <param name="jobDefinition">Job definition given by the 
user</param>
         /// <returns>The result of the job</returns>
-        public IEnumerable<TResult> Submit<TMapInput, TMapOutput, 
TResult>(IMRUJobDefinition jobDefinition)
+        public IEnumerable<TResult> Submit<TMapInput, TMapOutput, TResult, 
TPartitionType>(IMRUJobDefinition jobDefinition)
         {
             IConfiguration overallPerMapConfig = null;
             try

http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index c2f82fd..969a874 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -69,9 +69,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
         /// <typeparam name="TMapInput">The type of the side information 
provided to the Map function</typeparam>
         /// <typeparam name="TMapOutput">The return type of the Map 
function</typeparam>
         /// <typeparam name="TResult">The return type of the 
computation.</typeparam>
+        /// <typeparam name="TPartitionType">Type of data partition (Generic 
type in IInputPartition)</typeparam>
         /// <param name="jobDefinition">IMRU job definition given by the 
user</param>
         /// <returns>Null as results will be later written to some 
directory</returns>
-        IEnumerable<TResult> IIMRUClient.Submit<TMapInput, TMapOutput, 
TResult>(IMRUJobDefinition jobDefinition)
+        IEnumerable<TResult> IIMRUClient.Submit<TMapInput, TMapOutput, 
TResult, TPartitionType>(IMRUJobDefinition jobDefinition)
         {
             string driverId = string.Format("IMRU-{0}-Driver", 
jobDefinition.JobName);
             IConfiguration overallPerMapConfig = null;
@@ -90,15 +91,19 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
             {
                 DriverConfiguration.ConfigurationModule
                     .Set(DriverConfiguration.OnEvaluatorAllocated,
-                        GenericType<IMRUDriver<TMapInput, TMapOutput, 
TResult>>.Class)
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
                     .Set(DriverConfiguration.OnDriverStarted,
-                        GenericType<IMRUDriver<TMapInput, TMapOutput, 
TResult>>.Class)
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
                     .Set(DriverConfiguration.OnContextActive,
-                        GenericType<IMRUDriver<TMapInput, TMapOutput, 
TResult>>.Class)
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
                     .Set(DriverConfiguration.OnTaskCompleted,
-                        GenericType<IMRUDriver<TMapInput, TMapOutput, 
TResult>>.Class)
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
                     .Set(DriverConfiguration.OnEvaluatorFailed,
-                        GenericType<IMRUDriver<TMapInput, TMapOutput, 
TResult>>.Class)
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                    .Set(DriverConfiguration.OnContextFailed,
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                    .Set(DriverConfiguration.OnTaskFailed,
+                        GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
                     .Set(DriverConfiguration.CustomTraceLevel, 
TraceLevel.Info.ToString())
                     .Build(),
                 TangFactory.GetTang().NewConfigurationBuilder()
@@ -146,7 +151,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
             // The JobSubmission contains the Driver configuration as well as 
the files needed on the Driver.
             var imruJobSubmission = _jobRequestBuilder
                 .AddDriverConfiguration(imruDriverConfiguration)
-                .AddGlobalAssemblyForType(typeof(IMRUDriver<TMapInput, 
TMapOutput, TResult>))
+                .AddGlobalAssemblyForType(typeof(IMRUDriver<TMapInput, 
TMapOutput, TResult, TPartitionType>))
                 .SetJobIdentifier(jobDefinition.JobName)
                 .SetDriverMemory(5000)
                 .Build();

http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs
new file mode 100644
index 0000000..5ff36de
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Events;
+using Org.Apache.REEF.IO.PartitionedData;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    /// <summary>
+    /// This is part of Root context of map evaluators and instantiates 
+    /// input partition and later will call AddCache function also once
+    /// REEF-1339 is resolved.
+    /// </summary>
+    /// <typeparam name="T">Data Handle Type</typeparam>
+    internal class DataLoadingContext<T> : IObserver<IContextStart>
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(DataLoadingContext<T>));
+        private readonly IInputPartition<T> _partition;
+
+        [Inject]
+        private DataLoadingContext(IInputPartition<T> partition)
+        {
+            _partition = partition;
+            Logger.Log(Level.Verbose, "Entered data loading context");
+        }
+
+        /// <summary>
+        /// Specifies what to do when context starts.
+        /// </summary>
+        /// <param name="value">context start token</param>
+        /// TODO[REEF-1339] - AddCache() function of IInputPartition will be 
called here.
+        public void OnNext(IContextStart value)
+        {
+        }
+
+        /// <summary>
+        /// Specifies what to do if error occurs. We throw 
+        /// the caught exception in this case.
+        /// </summary>
+        /// <param name="error">Exception</param>
+        public void OnError(Exception error)
+        {
+            Exceptions.Throw(error, "Error occured in Data Loading context 
start", Logger);
+        }
+
+        /// <summary>
+        /// Specifies what to do at completion. In this case do nothing.
+        /// </summary>
+        public void OnCompleted()
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs
index dd9edf2..f4fba2e 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs
@@ -25,5 +25,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         internal const int TreeFanout = 2;
         internal const string MapTaskPrefix = "IMRUMap";
         internal const string UpdateTaskName = "IMRUMaster";
+        internal const string MasterContextId = "MasterRootContext";
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 9197703..59be761 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -31,6 +31,7 @@ using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
 using Org.Apache.REEF.IMRU.OnREEF.Parameters;
 using Org.Apache.REEF.IMRU.OnREEF.ResultHandler;
 using Org.Apache.REEF.IO.PartitionedData;
+using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
 using Org.Apache.REEF.Network.Group.Pipelining;
@@ -53,32 +54,38 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
     /// <typeparam name="TMapInput">Map Input</typeparam>
     /// <typeparam name="TMapOutput">Map output</typeparam>
     /// <typeparam name="TResult">Result</typeparam>
-    internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult> : 
IObserver<IDriverStarted>,
-        IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, 
IObserver<ICompletedTask>, IObserver<IFailedEvaluator>
+    /// <typeparam name="TPartitionType">Type of data partition (Generic type 
in IInputPartition)</typeparam>
+    internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType> 
+        : IObserver<IDriverStarted>,
+        IObserver<IAllocatedEvaluator>,
+        IObserver<IActiveContext>,
+        IObserver<ICompletedTask>,
+        IObserver<IFailedEvaluator>,
+        IObserver<IFailedContext>,
+        IObserver<IFailedTask>
     {
-        private static readonly Logger Logger = 
Logger.GetLogger(typeof(IMRUDriver<TMapInput, TMapOutput, TResult>));
+        private static readonly Logger Logger =
+            Logger.GetLogger(typeof(IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>));
 
         private readonly ConfigurationManager _configurationManager;
-        private readonly IPartitionedInputDataSet _dataSet;
+        private readonly int _totalMappers;
         private readonly IEvaluatorRequestor _evaluatorRequestor;
         private ICommunicationGroupDriver _commGroup;
         private readonly IGroupCommDriver _groupCommDriver;
         private readonly TaskStarter _groupCommTaskStarter;
-        private readonly ConcurrentStack<string> _taskIdStack;
         private readonly ConcurrentStack<IConfiguration> 
_perMapperConfiguration;
-        private readonly Stack<IPartitionDescriptor> _partitionDescriptorStack;
         private readonly int _coresPerMapper;
         private readonly int _coresForUpdateTask;
         private readonly int _memoryPerMapper;
         private readonly int _memoryForUpdateTask;
         private readonly ISet<IPerMapperConfigGenerator> _perMapperConfigs;
-        private readonly ConcurrentBag<ICompletedTask> _completedTasks;
+        private readonly ISet<ICompletedTask> _completedTasks = new 
HashSet<ICompletedTask>();
         private readonly int _allowedFailedEvaluators;
         private int _currentFailedEvaluators = 0;
-        private bool _reachedUpdateTaskActiveContext = false;
         private readonly bool _invokeGC;
+        private int _numberOfReadyTasks = 0;
 
-        private readonly ServiceAndContextConfigurationProvider<TMapInput, 
TMapOutput>
+        private readonly ServiceAndContextConfigurationProvider<TMapInput, 
TMapOutput, TPartitionType>
             _serviceAndContextConfigurationProvider;
 
         [Inject]
@@ -94,7 +101,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             [Parameter(typeof(InvokeGC))] bool invokeGC,
             IGroupCommDriver groupCommDriver)
         {
-            _dataSet = dataSet;
             _configurationManager = configurationManager;
             _evaluatorRequestor = evaluatorRequestor;
             _groupCommDriver = groupCommDriver;
@@ -103,23 +109,23 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             _memoryPerMapper = memoryPerMapper;
             _memoryForUpdateTask = memoryForUpdateTask;
             _perMapperConfigs = perMapperConfigs;
-            _completedTasks = new ConcurrentBag<ICompletedTask>();
+            _totalMappers = dataSet.Count;
+
             _allowedFailedEvaluators = (int)(failedEvaluatorsFraction * 
dataSet.Count);
             _invokeGC = invokeGC;
 
             AddGroupCommunicationOperators();
-            _groupCommTaskStarter = new TaskStarter(_groupCommDriver, 
_dataSet.Count + 1);
-
-            _taskIdStack = new ConcurrentStack<string>();
-            _perMapperConfiguration = new ConcurrentStack<IConfiguration>();
-            _partitionDescriptorStack = new Stack<IPartitionDescriptor>();
-            ConstructTaskIdAndPartitionDescriptorStack();
+            _groupCommTaskStarter = new TaskStarter(_groupCommDriver, 
_totalMappers + 1);
+            _perMapperConfiguration = 
ConstructPerMapperConfigStack(_totalMappers);
             _serviceAndContextConfigurationProvider =
-                new ServiceAndContextConfigurationProvider<TMapInput, 
TMapOutput>(dataSet.Count + 1, groupCommDriver,
-                    _configurationManager, _partitionDescriptorStack);
-
-            var msg = string.Format("map task memory:{0}, update task 
memory:{1}, map task cores:{2}, update task cores:{3}",
-                _memoryPerMapper, _memoryForUpdateTask, _coresPerMapper, 
_coresForUpdateTask);
+                new ServiceAndContextConfigurationProvider<TMapInput, 
TMapOutput, TPartitionType>(dataSet);
+
+            var msg =
+                string.Format("map task memory:{0}, update task memory:{1}, 
map task cores:{2}, update task cores:{3}",
+                    _memoryPerMapper,
+                    _memoryForUpdateTask,
+                    _coresPerMapper,
+                    _coresForUpdateTask);
             Logger.Log(Level.Info, msg);
         }
 
@@ -136,177 +142,267 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <summary>
         /// Specifies context and service configuration for evaluator depending
         /// on whether it is for Update function or for map function
-        /// Also handles evaluator failures
         /// </summary>
         /// <param name="allocatedEvaluator">The allocated evaluator</param>
         public void OnNext(IAllocatedEvaluator allocatedEvaluator)
         {
-            var configs = 
_serviceAndContextConfigurationProvider.GetNextConfiguration(allocatedEvaluator.Id);
+            var configs =
+                
_serviceAndContextConfigurationProvider.GetContextConfigurationForEvaluatorById(allocatedEvaluator.Id);
             allocatedEvaluator.SubmitContextAndService(configs.Context, 
configs.Service);
         }
 
         /// <summary>
-        /// Specfies the Map or Update task to run on the active context
+        /// Specifies the Map or Update task to run on the active context
         /// </summary>
         /// <param name="activeContext"></param>
         public void OnNext(IActiveContext activeContext)
         {
             Logger.Log(Level.Verbose, string.Format("Received Active Context 
{0}", activeContext.Id));
 
-            if (_groupCommDriver.IsMasterTaskContext(activeContext))
+            if 
(_serviceAndContextConfigurationProvider.IsMasterEvaluatorId(activeContext.EvaluatorId))
             {
-                _reachedUpdateTaskActiveContext = true;
-                RequestMapEvaluators(_dataSet.Count);
-
-                var partialTaskConf =
-                    TangFactory.GetTang()
-                        .NewConfigurationBuilder(new[]
-                        {
-                            TaskConfiguration.ConfigurationModule
-                                .Set(TaskConfiguration.Identifier,
-                                    IMRUConstants.UpdateTaskName)
-                                .Set(TaskConfiguration.Task,
-                                    GenericType<UpdateTaskHost<TMapInput, 
TMapOutput, TResult>>.Class)
-                                .Build(),
-                            _configurationManager.UpdateFunctionConfiguration,
-                            _configurationManager.ResultHandlerConfiguration
-                        })
-                        .BindNamedParameter(typeof(InvokeGC), 
_invokeGC.ToString())
-                        .Build();
-
-                try
-                {
-                    TangFactory.GetTang()
-                        .NewInjector(partialTaskConf, 
_configurationManager.UpdateFunctionCodecsConfiguration)
-                        .GetInstance<IIMRUResultHandler<TResult>>();
-                }
-                catch (InjectionException)
-                {
-                    partialTaskConf = 
TangFactory.GetTang().NewConfigurationBuilder(partialTaskConf)
-                        
.BindImplementation(GenericType<IIMRUResultHandler<TResult>>.Class,
-                            GenericType<DefaultResultHandler<TResult>>.Class)
-                        .Build();
-                    Logger.Log(Level.Warning,
-                        "User has not given any way to handle IMRU result, 
defaulting to ignoring it");
-                }
-
+                Logger.Log(Level.Verbose, "Submitting master task");
                 _commGroup.AddTask(IMRUConstants.UpdateTaskName);
-                _groupCommTaskStarter.QueueTask(partialTaskConf, 
activeContext);
+                _groupCommTaskStarter.QueueTask(GetUpdateTaskConfiguration(), 
activeContext);
+                RequestMapEvaluators(_totalMappers);
             }
             else
             {
-                string taskId;
-
-                if (!_taskIdStack.TryPop(out taskId))
-                {
-                    Logger.Log(Level.Warning, "No task Ids exist for the 
active context {0}. Disposing the context.",
-                        activeContext.Id);
-                    activeContext.Dispose();
-                    return;
-                }
-
-                IConfiguration mapSpecificConfig;
-
-                if (!_perMapperConfiguration.TryPop(out mapSpecificConfig))
-                {
-                    Logger.Log(Level.Warning,
-                        "No per map configuration exist for the active context 
{0}. Disposing the context.",
-                        activeContext.Id);
-                    activeContext.Dispose();
-                    return;
-                }
-
-                var partialTaskConf =
-                    TangFactory.GetTang()
-                        .NewConfigurationBuilder(new[]
-                        {
-                            TaskConfiguration.ConfigurationModule
-                                .Set(TaskConfiguration.Identifier, taskId)
-                                .Set(TaskConfiguration.Task, 
GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class)
-                                .Build(),
-                            _configurationManager.MapFunctionConfiguration,
-                            mapSpecificConfig
-                        })
-                        .BindNamedParameter(typeof(InvokeGC), 
_invokeGC.ToString())
-                        .Build();
-
+                Logger.Log(Level.Verbose, "Submitting map task");
+                
_serviceAndContextConfigurationProvider.RecordActiveContextPerEvaluatorId(activeContext.EvaluatorId);
+                string taskId = 
GetTaskIdByEvaluatorId(activeContext.EvaluatorId);
                 _commGroup.AddTask(taskId);
-                _groupCommTaskStarter.QueueTask(partialTaskConf, 
activeContext);
+                
_groupCommTaskStarter.QueueTask(GetMapTaskConfiguration(activeContext, taskId), 
activeContext);
+                Interlocked.Increment(ref _numberOfReadyTasks);
+                Logger.Log(Level.Verbose, string.Format("{0} Tasks are ready 
for submission", _numberOfReadyTasks));
             }
         }
 
         /// <summary>
-        /// Specfies what to do when the task is completed
+        /// Specifies what to do when the task is completed
         /// In this case just disposes off the task
         /// </summary>
         /// <param name="completedTask">The link to the completed task</param>
         public void OnNext(ICompletedTask completedTask)
         {
-            _completedTasks.Add(completedTask);
-
-            if (_completedTasks.Count != _dataSet.Count + 1)
+            lock (_completedTasks)
             {
-                return;
-            }
-            
-            foreach (var task in _completedTasks)
-            {
-                Logger.Log(Level.Verbose, string.Format("Disposing task: {0}", 
task.Id));
-                task.ActiveContext.Dispose();
+                Logger.Log(Level.Info,
+                    string.Format("Received completed task message from task 
Id: {0}", completedTask.Id));
+                _completedTasks.Add(completedTask);
+
+                if (AreIMRUTasksCompleted())
+                {
+                    ShutDownAllEvaluators();
+                }
             }
         }
 
+        /// <summary>
+        /// Specifies what to do when evaluator fails.
+        /// If we get all completed tasks then ignore the failure
+        /// Else request a new evaluator. If failure happens in middle of IMRU 
+        /// job we expect neighboring evaluators to fail while doing 
+        /// communication and will use FailedTask and FailedContext logic to 
+        /// order shutdown.
+        /// </summary>
+        /// <param name="value"></param>
         public void OnNext(IFailedEvaluator value)
         {
-            Logger.Log(Level.Info, "An evaluator failed, checking if it failed 
before context and service was submitted");
-            int currFailedEvaluators = Interlocked.Increment(ref 
_currentFailedEvaluators);
-
-            if (value.FailedContexts != null && value.FailedContexts.Count != 
0)
+            if (AreIMRUTasksCompleted())
             {
-                Logger.Log(Level.Info, "Some active context failed, cannot 
continue IMRU task");        
-                Exceptions.Throw(new Exception(), Logger);
+                Logger.Log(Level.Info,
+                    string.Format("Evaluator with Id: {0} failed but IMRU task 
is completed. So ignoring.", value.Id));
+                return;
             }
 
+            Logger.Log(Level.Info,
+                string.Format("Evaluator with Id: {0} failed with Exception: 
{1}", value.Id, value.EvaluatorException));
+            int currFailedEvaluators = Interlocked.Increment(ref 
_currentFailedEvaluators);
             if (currFailedEvaluators > _allowedFailedEvaluators)
             {
-                Exceptions.Throw(new Exception("Cannot continue IMRU job, 
Failed evaluators reach maximum limit"),
+                Exceptions.Throw(new 
MaximumNumberOfEvaluatorFailuresExceededException(_allowedFailedEvaluators),
                     Logger);
             }
 
-            Logger.Log(Level.Info, "Requesting for the failed evaluator 
again");
+            
_serviceAndContextConfigurationProvider.RecordEvaluatorFailureById(value.Id);
+            bool isMaster = 
_serviceAndContextConfigurationProvider.IsMasterEvaluatorId(value.Id);
 
-            _serviceAndContextConfigurationProvider.EvaluatorFailed(value.Id);
-
-            // if active context stage is reached for Update Task then assume 
that failed
-            // evaluator belongs to mapper
-            if (_reachedUpdateTaskActiveContext)
+            // If failed evaluator is master then ask for master 
+            // evaluator else ask for mapper evaluator
+            if (!isMaster)
             {
+                Logger.Log(Level.Info, string.Format("Requesting a replacement 
map Evaluator for {0}", value.Id));
                 RequestMapEvaluators(1);
             }
             else
             {
+                Logger.Log(Level.Info, string.Format("Requesting a replacement 
master Evaluator for {0}", value.Id));
                 RequestUpdateEvaluator();
             }
         }
 
         /// <summary>
-        /// Specfies how to handle exception or error
+        /// Specifies what to do if Failed Context is received.
+        /// An exception is thrown if tasks are not completed.
         /// </summary>
-        /// <param name="error">Kind of exception</param>
-        public void OnError(Exception error)
+        /// <param name="value"></param>
+        public void OnNext(IFailedContext value)
         {
-            Logger.Log(Level.Error, "Cannot currently handle the Exception in 
OnError function");
-            throw new NotImplementedException("Cannot currently handle 
exception in OneError", error);
+            if (AreIMRUTasksCompleted())
+            {
+                Logger.Log(Level.Info,
+                    string.Format("Context with Id: {0} failed but IMRU task 
is completed. So ignoring.", value.Id));
+                return;
+            }
+            Exceptions.Throw(new Exception(string.Format("Data Loading Context 
with Id: {0} failed", value.Id)), Logger);
         }
 
         /// <summary>
-        /// Specfies what to do when driver is done
-        /// In this case do nothing
+        /// Specifies what to do if a task fails.
+        /// We throw the exception and fail IMRU unless IMRU job is already 
done.
         /// </summary>
+        /// <param name="value"></param>
+        public void OnNext(IFailedTask value)
+        {
+            if (AreIMRUTasksCompleted())
+            {
+                Logger.Log(Level.Info,
+                    string.Format("Task with Id: {0} failed but IMRU task is 
completed. So ignoring.", value.Id));
+                return;
+            }
+            Exceptions.Throw(new Exception(string.Format("Task with Id: {0} 
failed", value.Id)), Logger);
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
         public void OnCompleted()
         {
         }
 
+        private bool AreIMRUTasksCompleted()
+        {
+            return _completedTasks.Count >= _totalMappers + 1;
+        }
+
+        private string GetTaskIdByEvaluatorId(string evaluatorId)
+        {
+            return string.Format("{0}-{1}-Version0",
+                IMRUConstants.MapTaskPrefix,
+                
_serviceAndContextConfigurationProvider.GetPartitionIdByEvaluatorId(evaluatorId));
+        }
+
+        /// <summary>
+        /// Shuts down evaluators once all completed task messages are received
+        /// </summary>
+        private void ShutDownAllEvaluators()
+        {
+            foreach (var task in _completedTasks)
+            {
+                Logger.Log(Level.Info, string.Format("Disposing task: {0}", 
task.Id));
+                task.ActiveContext.Dispose();
+            }
+        }
+
+        /// <summary>
+        /// Generates map task configuration given the active context. 
+        /// Merge configurations of all the inputs to the MapTaskHost.
+        /// </summary>
+        /// <param name="activeContext">Active context to which task needs to 
be submitted</param>
+        /// <param name="taskId">Task Id</param>
+        /// <returns>Map task configuration</returns>
+        private IConfiguration GetMapTaskConfiguration(IActiveContext 
activeContext, string taskId)
+        {
+            IConfiguration mapSpecificConfig;
+
+            if (!_perMapperConfiguration.TryPop(out mapSpecificConfig))
+            {
+                Exceptions.Throw(
+                    new IMRUSystemException(string.Format("No per map 
configuration exist for the active context {0}",
+                        activeContext.Id)),
+                    Logger);
+            }
+
+            return TangFactory.GetTang()
+                .NewConfigurationBuilder(TaskConfiguration.ConfigurationModule
+                    .Set(TaskConfiguration.Identifier, taskId)
+                    .Set(TaskConfiguration.Task, 
GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class)
+                    .Build(),
+                    _configurationManager.MapFunctionConfiguration,
+                    mapSpecificConfig,
+                    GetGroupCommConfiguration())
+                .BindNamedParameter<InvokeGC, 
bool>(GenericType<InvokeGC>.Class, _invokeGC.ToString())
+                .Build();
+        }
+
+        /// <summary>
+        /// Generates the update task configuration.
+        /// Merge configurations of all the inputs to the UpdateTaskHost.
+        /// </summary>
+        /// <returns>Update task configuration</returns>
+        private IConfiguration GetUpdateTaskConfiguration()
+        {
+            var partialTaskConf =
+                TangFactory.GetTang()
+                    
.NewConfigurationBuilder(TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier,
+                            IMRUConstants.UpdateTaskName)
+                        .Set(TaskConfiguration.Task,
+                            GenericType<UpdateTaskHost<TMapInput, TMapOutput, 
TResult>>.Class)
+                        .Build(),
+                        _configurationManager.UpdateFunctionConfiguration,
+                        _configurationManager.ResultHandlerConfiguration,
+                        GetGroupCommConfiguration())
+                    .BindNamedParameter<InvokeGC, 
bool>(GenericType<InvokeGC>.Class, _invokeGC.ToString())
+                    .Build();
+
+            // This piece of code basically checks if user has given any 
implementation 
+            // of IIMRUResultHandler. If not then bind it to default 
implementation which 
+            // does nothing. For interfaces with generic type we cannot assign 
default 
+            // implementation.
+            try
+            {
+                TangFactory.GetTang()
+                    .NewInjector(partialTaskConf)
+                    .GetInstance<IIMRUResultHandler<TResult>>();
+            }
+            catch (InjectionException)
+            {
+                partialTaskConf = 
TangFactory.GetTang().NewConfigurationBuilder(partialTaskConf)
+                    
.BindImplementation(GenericType<IIMRUResultHandler<TResult>>.Class,
+                        GenericType<DefaultResultHandler<TResult>>.Class)
+                    .Build();
+                Logger.Log(Level.Info,
+                    "User has not given any way to handle IMRU result, 
defaulting to ignoring it");
+            }
+            return partialTaskConf;
+        }
+
+        /// <summary>
+        /// Generate the group communicaiton configuration to be added 
+        /// to the tasks
+        /// </summary>
+        /// <returns>The group communication configuration</returns>
+        private IConfiguration GetGroupCommConfiguration()
+        {
+            var codecConfig =
+                TangFactory.GetTang()
+                    .NewConfigurationBuilder(
+                        
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
+                            
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
+                            
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
+                        
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
+                        
_configurationManager.UpdateFunctionCodecsConfiguration)
+                    .Build();
+
+            return 
Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig);
+        }
+
+        /// <summary>
+        /// Adds broadcast and reduce operators to the default communication 
group
+        /// </summary>
         private void AddGroupCommunicationOperators()
         {
             var reduceFunctionConfig = 
_configurationManager.ReduceFunctionConfiguration;
@@ -369,23 +465,31 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                     .Build();
         }
 
-        private void ConstructTaskIdAndPartitionDescriptorStack()
+        /// <summary>
+        /// Construct the stack of map configuraion which 
+        /// is specific to each mapper. If user does not 
+        /// specify any then its empty configuration
+        /// </summary>
+        /// <param name="totalMappers">Total mappers</param>
+        /// <returns>Stack of configuration</returns>
+        private ConcurrentStack<IConfiguration> 
ConstructPerMapperConfigStack(int totalMappers)
         {
-            int counter = 0;
-
-            foreach (var partitionDescriptor in _dataSet)
+            var perMapperConfiguration = new ConcurrentStack<IConfiguration>();
+            for (int i = 0; i < totalMappers; i++)
             {
-                string id = IMRUConstants.MapTaskPrefix + "-Id" + counter + 
"-Version0";
-                _taskIdStack.Push(id);
-                _partitionDescriptorStack.Push(partitionDescriptor);
-
                 var emptyConfig = 
TangFactory.GetTang().NewConfigurationBuilder().Build();
-                IConfiguration config = 
_perMapperConfigs.Aggregate(emptyConfig, (current, configGenerator) => 
Configurations.Merge(current, configGenerator.GetMapperConfiguration(counter, 
_dataSet.Count)));
-                _perMapperConfiguration.Push(config);
-                counter++;
+                IConfiguration config = 
_perMapperConfigs.Aggregate(emptyConfig,
+                    (current, configGenerator) =>
+                        Configurations.Merge(current, 
configGenerator.GetMapperConfiguration(i, totalMappers)));
+                perMapperConfiguration.Push(config);
             }
+            return perMapperConfiguration;
         }
 
+        /// <summary>
+        /// Request map evaluators from resource manager
+        /// </summary>
+        /// <param name="numEvaluators">Number of evaluators to request</param>
         private void RequestMapEvaluators(int numEvaluators)
         {
             _evaluatorRequestor.Submit(
@@ -396,6 +500,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                     .Build());
         }
 
+        /// <summary>
+        /// Request update/master evaluator from resource manager
+        /// </summary>
         private void RequestUpdateEvaluator()
         {
             _evaluatorRequestor.Submit(

http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUSystemException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUSystemException.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUSystemException.cs
new file mode 100644
index 0000000..94cf619
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUSystemException.cs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    /// <summary>
+    /// Type of exception thrown when possible bugs are detected in IMRU code.
+    /// For example, we reach forbidden region of codes, inconsistent state 
etc.
+    /// </summary>
+    public sealed class IMRUSystemException : Exception
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(IMRUSystemException));
+
+        /// <summary>
+        /// Constructor. Appends the user message with the message that 
+        /// there is an issue in IMRU code.
+        /// </summary>
+        /// <param name="message">The user message for exception</param>
+        public IMRUSystemException(string message)
+            : base(AppendedMessage(message))
+        {
+        }
+
+        /// <summary>
+        /// Constructor. Appends the user message with the message that 
+        /// there is an issue in IMRU code. Also throws the provided exception.
+        /// </summary>
+        /// <param name="message">The user message for exception</param>
+        /// <param name="inner">The actual exception message due to which 
connection failed</param>
+        public IMRUSystemException(string message, Exception inner)
+            : base(AppendedMessage(message), inner)
+        {
+        }
+
+        private static string AppendedMessage(string message)
+        {
+            return "Possible Bug in the IMRU code: " + message;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/MaximumNumberOfEvalutorFailuresExceededException.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/MaximumNumberOfEvalutorFailuresExceededException.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/MaximumNumberOfEvalutorFailuresExceededException.cs
new file mode 100644
index 0000000..df3c5ea
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/MaximumNumberOfEvalutorFailuresExceededException.cs
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    /// <summary>
+    /// Type of exception thrown when number of failed evaluators reach the 
+    /// maximum allowed limit.
+    /// </summary>
+    public sealed class MaximumNumberOfEvaluatorFailuresExceededException : 
Exception
+    {
+        private static readonly Logger Logger =
+            
Logger.GetLogger(typeof(MaximumNumberOfEvaluatorFailuresExceededException));
+
+        /// <summary>
+        /// Constructor for throwing exception when the number of evaluator 
failures reaches maximum limit.
+        /// </summary>
+        /// <param name="maximumAllowedEvaluatorFailures">maximum number of 
evaluators allowed to fail</param>
+        public MaximumNumberOfEvaluatorFailuresExceededException(int 
maximumAllowedEvaluatorFailures)
+            : base(CreateMessage(maximumAllowedEvaluatorFailures))
+        {
+        }
+
+        private static string CreateMessage(int 
maximumAllowedEvaluatorFailures)
+        {
+            return string.Format("Exiting IMRU. Number of failed evaluators 
reach the maximum allowed limit of {0}",
+                maximumAllowedEvaluatorFailures);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
index 0cfeec3..36916db 100644
--- 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
@@ -17,12 +17,10 @@
 
 using System;
 using System.Collections.Generic;
-using System.Linq;
-using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Events;
+using Org.Apache.REEF.Common.Services;
 using Org.Apache.REEF.IO.PartitionedData;
-using Org.Apache.REEF.Network.Group.Config;
-using Org.Apache.REEF.Network.Group.Driver;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Diagnostics;
@@ -36,32 +34,26 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
     /// </summary>
     /// <typeparam name="TMapInput"></typeparam>
     /// <typeparam name="TMapOutput"></typeparam>
-    internal sealed class ServiceAndContextConfigurationProvider<TMapInput, 
TMapOutput>
+    /// <typeparam name="TPartitionType"></typeparam>
+    internal sealed class ServiceAndContextConfigurationProvider<TMapInput, 
TMapOutput, TPartitionType>
     {
-        private static readonly Logger Logger = 
Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, 
TMapOutput>));
-
-        private readonly Dictionary<string, ContextAndServiceConfiguration> 
_configurationProvider;
-        private readonly ISet<string> _failedEvaluators;
-        private readonly ISet<string> _submittedEvaluators; 
-        private readonly object _lock;
-        private readonly int _numNodes;
-        private int _assignedPartitionDescriptors;
-        private readonly IGroupCommDriver _groupCommDriver;
-        private readonly ConfigurationManager _configurationManager;
-        private readonly Stack<IPartitionDescriptor> _partitionDescriptors;
-
-        internal ServiceAndContextConfigurationProvider(int numNodes, 
IGroupCommDriver groupCommDriver,
-            ConfigurationManager configurationManager, 
Stack<IPartitionDescriptor> partitionDescriptors)
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, 
TMapOutput, TPartitionType>));
+
+        private readonly Dictionary<string, string> _partitionIdProvider = new 
Dictionary<string, string>();
+        private readonly ISet<string> _submittedEvaluators = new 
HashSet<string>();
+        private readonly ISet<string> _contextLoadedEvaluators = new 
HashSet<string>(); 
+        private readonly object _lock = new object();
+        private readonly Stack<string> _partitionDescriptorIds = new 
Stack<string>();
+        private readonly IPartitionedInputDataSet _dataset;
+        private string _masterEvaluatorId;
+
+        internal 
ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset)
         {
-            _configurationProvider = new Dictionary<string, 
ContextAndServiceConfiguration>();
-            _failedEvaluators = new HashSet<string>();
-            _submittedEvaluators = new HashSet<string>();
-            _numNodes = numNodes;
-            _groupCommDriver = groupCommDriver;
-            _configurationManager = configurationManager;
-            _assignedPartitionDescriptors = 0;
-            _partitionDescriptors = partitionDescriptors;
-            _lock = new object();
+            _dataset = dataset;
+            foreach (var descriptor in _dataset)
+            {
+                _partitionDescriptorIds.Push(descriptor.Id);
+            }
         }
 
         /// <summary>
@@ -69,113 +61,229 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// submitted evaluator to failed evaluator
         /// </summary>
         /// <param name="evaluatorId"></param>
-        internal void EvaluatorFailed(string evaluatorId)
+        /// <returns>Whether failed evaluator is master or not</returns>
+        internal bool RecordEvaluatorFailureById(string evaluatorId)
         {
             lock (_lock)
             {
+                string msg;
+                bool isMaster = IsMasterEvaluatorId(evaluatorId);
+
+                if (_contextLoadedEvaluators.Contains(evaluatorId))
+                {
+                    msg =
+                        string.Format(
+                            "Failed evaluator:{0} already had context loaded. 
Cannot handle failure at this stage",
+                            evaluatorId);
+                    Exceptions.Throw(new Exception(msg), Logger);
+                }
+
                 if (!_submittedEvaluators.Contains(evaluatorId))
                 {
-                    Exceptions.Throw(new Exception("Failed evaluator was never 
submitted"), Logger);
+                    msg = string.Format("Failed evaluator:{0} was never 
submitted", evaluatorId);
+                    Exceptions.Throw(new Exception(msg), Logger);
+                }
+
+                if (!_partitionIdProvider.ContainsKey(evaluatorId) && 
!isMaster)
+                {
+                    msg = string.Format("Partition descriptor for Failed 
evaluator:{0} not present", evaluatorId);
+                    Exceptions.Throw(new Exception(msg), Logger);
                 }
 
-                _failedEvaluators.Add(evaluatorId);
                 _submittedEvaluators.Remove(evaluatorId);
+
+                if (isMaster)
+                {
+                    Logger.Log(Level.Info, "Failed Evaluator is Master");
+                    _masterEvaluatorId = null;
+                    return true;
+                }
+                
+                Logger.Log(Level.Info, "Failed Evaluator is a Mapper");
+                
_partitionDescriptorIds.Push(_partitionIdProvider[evaluatorId]);
+                _partitionIdProvider.Remove(evaluatorId);
+                return false;
             }
         }
 
         /// <summary>
-        /// Gives context and service configuration for next evaluator either 
from failed 
-        /// evaluator or new configuration
+        /// Notifies that active context state has been reached
         /// </summary>
         /// <param name="evaluatorId"></param>
-        /// <returns></returns>
-        internal ContextAndServiceConfiguration GetNextConfiguration(string 
evaluatorId)
+        internal void RecordActiveContextPerEvaluatorId(string evaluatorId)
+        {
+            lock (_lock)
+            {
+                if (!_submittedEvaluators.Contains(evaluatorId))
+                {
+                    var msg = string.Format("Evaluator:{0} never loaded data 
but still reached active context stage",
+                        evaluatorId);
+                    Exceptions.Throw(new Exception(msg), Logger);
+                }
+
+                if (_contextLoadedEvaluators.Contains(evaluatorId))
+                {
+                    var msg = string.Format("Evaluator:{0} already reached the 
active context stage", evaluatorId);
+                    Exceptions.Throw(new Exception(msg), Logger);
+                }
+
+                _contextLoadedEvaluators.Add(evaluatorId);
+                _submittedEvaluators.Remove(evaluatorId);
+            }
+        }
+
+        /// <summary>
+        /// Gets next context configuration. Either master or mapper.
+        /// </summary>
+        /// <param name="evaluatorId">Evaluator Id</param>
+        /// <returns>The context and service configuration</returns>
+        internal ContextAndServiceConfiguration 
GetContextConfigurationForEvaluatorById(string evaluatorId)
         {
             lock (_lock)
             {
                 if (_submittedEvaluators.Contains(evaluatorId))
                 {
-                    Exceptions.Throw(new Exception("The evaluator is already 
submitted"), Logger);
+                    string msg = string.Format("The context is already 
submitted to evaluator:{0}", evaluatorId);
+                    Exceptions.Throw(new Exception(msg), Logger);
                 }
 
-                if (_failedEvaluators.Count == 0 && 
_assignedPartitionDescriptors >= _numNodes)
+                if (_masterEvaluatorId == null)
+                {
+                    Logger.Log(Level.Info, "Submitting root context and 
service for master");
+                    _masterEvaluatorId = evaluatorId;
+                    _submittedEvaluators.Add(evaluatorId);
+                    return new ContextAndServiceConfiguration(
+                        
ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier,
+                            IMRUConstants.MasterContextId).Build(),
+                        
TangFactory.GetTang().NewConfigurationBuilder().Build());
+                }
+
+                Logger.Log(Level.Info, "Submitting root context and service 
for a map task");
+                return 
GetDataLoadingConfigurationForEvaluatorById(evaluatorId);
+            }
+        }
+
+        /// <summary>
+        /// Checks whether evaluator id is that of master
+        /// </summary>
+        /// <param name="evaluatorId">Id of evaluator</param>
+        /// <returns>true if id is that of master</returns>
+        internal bool IsMasterEvaluatorId(string evaluatorId)
+        {
+            return evaluatorId.Equals(_masterEvaluatorId);
+        }
+
+        /// <summary>
+        /// Gets partition Id for the evaluator
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        /// <returns></returns>
+        internal string GetPartitionIdByEvaluatorId(string evaluatorId)
+        {
+            lock (_lock)
+            {
+                string msg;
+                if (!_submittedEvaluators.Contains(evaluatorId) && 
!_contextLoadedEvaluators.Contains(evaluatorId))
                 {
-                    Exceptions.Throw(new Exception("No more configuration can 
be provided"), Logger);
+                    msg = string.Format("Context for Evaluator:{0} has never 
been submitted", evaluatorId);
+                    Exceptions.Throw(new IMRUSystemException(msg), Logger);
                 }
 
-                // if some failed id exists return that configuration
-                if (_failedEvaluators.Count != 0)
+                if (IsMasterEvaluatorId(evaluatorId))
                 {
-                    string failedEvaluatorId = _failedEvaluators.First();
-                    _failedEvaluators.Remove(failedEvaluatorId);
-                    var config = _configurationProvider[failedEvaluatorId];
-                    _configurationProvider.Remove(failedEvaluatorId);
-                    _configurationProvider[evaluatorId] = config;
+                    msg = string.Format("Evaluator:{0} is master and does not 
get partition", evaluatorId);
+                    Exceptions.Throw(new IMRUSystemException(msg), Logger);
                 }
-                else
+
+                if (!_partitionIdProvider.ContainsKey(evaluatorId))
                 {
-                    _assignedPartitionDescriptors++;
-
-                    if (_configurationProvider.ContainsKey(evaluatorId))
-                    {
-                        Exceptions.Throw(
-                            new Exception(
-                                "Evaluator Id already present in configuration 
cache, they have to be unique"),
-                            Logger);
-                    }
-
-                    // Checks whether to put update task configuration or map 
task configuration
-                    if (_assignedPartitionDescriptors == 1)
-                    {
-                        _configurationProvider[evaluatorId] = 
GetUpdateTaskContextAndServiceConfiguration();
-                    }
-                    else
-                    {
-                        _configurationProvider[evaluatorId] =
-                            
GetMapTaskContextAndServiceConfiguration(_partitionDescriptors.Pop());
-                    }
+                    msg = string.Format("Partition descriptor for 
evaluator:{0} is not present in the mapping", evaluatorId);
+                    Exceptions.Throw(new IMRUSystemException(msg), Logger);   
                 }
 
-                _submittedEvaluators.Add(evaluatorId);
-                return _configurationProvider[evaluatorId];
+                return _partitionIdProvider[evaluatorId];
             }
         }
 
-        private ContextAndServiceConfiguration 
GetMapTaskContextAndServiceConfiguration(IPartitionDescriptor 
partitionDescriptor)
+        /// <summary>
+        /// Gives context and service configuration for next evaluator either 
from failed 
+        /// evaluator or new configuration
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        /// <returns></returns>
+        private ContextAndServiceConfiguration 
GetDataLoadingConfigurationForEvaluatorById(string evaluatorId)
         {
-            var codecConfig =
-                TangFactory.GetTang()
-                    .NewConfigurationBuilder(
-                        
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
-                            
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
-                            
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
-                        
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
-                        _configurationManager.MapInputCodecConfiguration)
-                    .Build();
+            string msg;
+           
+            if (_contextLoadedEvaluators.Contains(evaluatorId))
+            {
+                msg = string.Format("Evaluator:{0} already has the data 
loaded", evaluatorId);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+
+            if (_partitionDescriptorIds.Count == 0)
+            {
+                Exceptions.Throw(new IMRUSystemException("No more data 
configuration can be provided"), Logger);
+            }
 
-            var contextConf = _groupCommDriver.GetContextConfiguration();
-            var serviceConf = 
Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig, 
partitionDescriptor.GetPartitionConfiguration());
+            if (_partitionIdProvider.ContainsKey(evaluatorId))
+            {
+                msg =
+                    string.Format(
+                        "Evaluator Id:{0} already present in configuration 
cache, they have to be unique",
+                        evaluatorId);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
 
-            return new ContextAndServiceConfiguration(contextConf, 
serviceConf);
+            Logger.Log(Level.Info, "Getting a new data loading configuration");
+            _partitionIdProvider[evaluatorId] = _partitionDescriptorIds.Pop();
+            _submittedEvaluators.Add(evaluatorId);
+
+            msg = string.Format(
+                "Current status: Submitted Evaluators-{0}, Data Loaded 
Evaluators-{1}, Unused data partitions-{2}",
+                _submittedEvaluators.Count,
+                _contextLoadedEvaluators.Count,
+                _partitionDescriptorIds.Count);
+            Logger.Log(Level.Info, msg);
+
+            try
+            {
+                IPartitionDescriptor partitionDescriptor =
+                    
_dataset.GetPartitionDescriptorForId(_partitionIdProvider[evaluatorId]);
+                return 
GetDataLoadingContextAndServiceConfiguration(partitionDescriptor, evaluatorId);
+            }
+            catch (Exception e)
+            {
+                msg = string.Format("Error while trying to access partition 
descriptor:{0} from dataset",
+                    _partitionIdProvider[evaluatorId]);
+                Exceptions.Throw(e, msg, Logger);
+                return null;
+            }
         }
 
-        private ContextAndServiceConfiguration 
GetUpdateTaskContextAndServiceConfiguration()
+        private ContextAndServiceConfiguration 
GetDataLoadingContextAndServiceConfiguration(
+            IPartitionDescriptor partitionDescriptor,
+            string evaluatorId)
         {
-            var codecConfig =
+            var dataLoadingContextConf =
                 TangFactory.GetTang()
-                    .NewConfigurationBuilder(
-                        new[]
-                        {
-                            
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set(
-                                
StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec,
-                                
GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(),
-                            
StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(),
-                            
_configurationManager.UpdateFunctionCodecsConfiguration
-                        })
+                    .NewConfigurationBuilder()
+                    .BindSetEntry<ContextConfigurationOptions.StartHandlers, 
DataLoadingContext<TPartitionType>, IObserver<IContextStart>>(
+                            
GenericType<ContextConfigurationOptions.StartHandlers>.Class,
+                            
GenericType<DataLoadingContext<TPartitionType>>.Class)
                     .Build();
 
-            var serviceConf = 
Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig);
-            return new 
ContextAndServiceConfiguration(_groupCommDriver.GetContextConfiguration(), 
serviceConf);
+            var serviceConf =
+                TangFactory.GetTang()
+                    
.NewConfigurationBuilder(ServiceConfiguration.ConfigurationModule.Build(),
+                        dataLoadingContextConf,
+                        partitionDescriptor.GetPartitionConfiguration())
+                    .Build();
+
+            var contextConf = ContextConfiguration.ConfigurationModule
+                .Set(ContextConfiguration.Identifier, 
string.Format("DataLoading-{0}", evaluatorId))
+                .Build();
+            return new ContextAndServiceConfiguration(contextConf, 
serviceConf);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 7d6e8c3..7c6c8f3 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -70,6 +70,9 @@ under the License.
     <Compile Include="OnREEF\Client\REEFIMRUClient.cs" />
     <Compile Include="OnREEF\Driver\ConfigurationManager.cs" />
     <Compile Include="OnREEF\Driver\ContextAndServiceConfiguration.cs" />
+    <Compile Include="OnREEF\Driver\DataLoadingContext.cs" />
+    <Compile 
Include="OnREEF\Driver\MaximumNumberOfEvalutorFailuresExceededException.cs" />
+    <Compile Include="OnREEF\Driver\IMRUSystemException.cs" />
     <Compile Include="OnREEF\Driver\IMRUConstants.cs" />
     <Compile Include="OnREEF\Driver\IMRUDriver.cs" />
     <Compile Include="OnREEF\Driver\ServiceAndContextConfigurationProvider.cs" 
/>

Reply via email to