[REEF-1404] Move IMRU Task state to Context

  * Define ITaskState interface in IMRU and default implementation
  * Define MapTaskState and UpdateTaskState in IMRU example
  * Modify IMRU JobDefinition and IMRUJobDefinitionBuilder to all the task state
    to be configured
  * Modify ConfigurationManager for managing updateTaskStateConfiguration and
    mapTaskStateConfiguration
  * Modify ServiceAndContextConfigurationProvider to set the service and state
    configurations
  * Add TaskStateService to bind it to context for holding ITaskState instance
  * Change Update and map function in IMRU example and test to save and restore
    the state

JIRA:
  [REEF-1404](https://issues.apache.org/jira/browse/REEF-1404)

Pull Request:
  This closes #1131


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/f5107f0e
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/f5107f0e
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/f5107f0e

Branch: refs/heads/master
Commit: f5107f0ebd7553d1f3f0995a9472389e437b69e5
Parents: 59b766e
Author: Julia Wang <[email protected]>
Authored: Thu Oct 6 17:07:46 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Oct 6 17:56:31 2016 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.IMRU.Examples.csproj        |   4 +-
 .../FaultTolerantPipelinedBroadcastAndReduce.cs | 220 ----------
 .../PipelinedBroadcastReduce/MapTaskState.cs    |  41 ++
 .../PipelinedBroadcastAndReduce.cs              |  30 +-
 ...elinedBroadcastAndReduceWithFaultTolerant.cs | 402 +++++++++++++++++++
 .../PipelinedBroadcastReduce/UpdateTaskState.cs |  50 +++
 lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs    |   2 +-
 .../API/IMRUJobDefinition.cs                    |  24 ++
 .../API/IMRUJobDefinitionBuilder.cs             |  28 ++
 .../OnREEF/Client/REEFIMRUClient.cs             |   5 +
 .../OnREEF/Driver/ConfigurationManager.cs       |  40 ++
 .../OnREEF/Driver/IMRUDriver.cs                 |   4 +-
 .../ServiceAndContextConfigurationProvider.cs   |  25 +-
 .../OnREEF/Driver/TaskStateService.cs           |  44 ++
 .../OnREEF/IMRUTasks/ITaskState.cs              |  27 ++
 .../SerializedMapTaskStateConfiguration.cs      |  26 ++
 .../SerializedUpdateTaskStateConfiguration.cs   |  26 ++
 .../Org.Apache.REEF.IMRU.csproj                 |   4 +
 .../IMRU/IMRUBrodcastReduceTestBase.cs          |  45 ++-
 .../Functional/IMRU/IMRUCloseTaskTest.cs        |   4 +-
 .../Functional/IMRU/TestFailMapperEvaluators.cs |  82 +++-
 .../IMRU/TestFailMapperEvaluatorsOnDispose.cs   |  11 +-
 .../IMRU/TestFailMapperEvaluatorsOnInit.cs      |  11 +-
 .../Functional/IMRU/TestFailMapperTasks.cs      |  12 +-
 .../IMRU/TestFailMapperTasksOnDispose.cs        |  11 +-
 .../IMRU/TestFailMapperTasksOnInit.cs           |  11 +-
 .../Functional/IMRU/TestFailUpdateEvaluator.cs  |  12 +-
 27 files changed, 900 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
index 3cb5fd3..ae2cb15 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
@@ -46,7 +46,9 @@ under the License.
     <Compile Include="MapperCount\IdentityMapFunction.cs" />
     <Compile Include="IntSumReduceFunction.cs" />
     <Compile Include="MapperCount\MapperCount.cs" />
-    <Compile 
Include="PipelinedBroadcastReduce\FaultTolerantPipelinedBroadcastAndReduce.cs" 
/>
+    <Compile 
Include="PipelinedBroadcastReduce\PipelinedBroadcastAndReduceWithFaultTolerant.cs"
 />
+    <Compile Include="PipelinedBroadcastReduce\MapTaskState.cs" />
+    <Compile Include="PipelinedBroadcastReduce\UpdateTaskState.cs" />
     <Compile Include="SingleIterUpdateFunction.cs" />
     <Compile Include="NaturalSum\NaturalSum.cs" />
     <Compile Include="NaturalSum\NaturalSumMapFunction.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
deleted file mode 100644
index 1b7b15f..0000000
--- 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/FaultTolerantPipelinedBroadcastAndReduce.cs
+++ /dev/null
@@ -1,220 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-// 
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-using System.Collections.Generic;
-using System.IO;
-using System.Linq;
-using Org.Apache.REEF.Common.Tasks;
-using Org.Apache.REEF.IMRU.API;
-using Org.Apache.REEF.IMRU.OnREEF.Parameters;
-using Org.Apache.REEF.Tang.Annotations;
-using Org.Apache.REEF.Tang.Implementations.Configuration;
-using Org.Apache.REEF.Tang.Implementations.Tang;
-using Org.Apache.REEF.Tang.Interface;
-using Org.Apache.REEF.Tang.Util;
-using Org.Apache.REEF.Utilities.Diagnostics;
-using Org.Apache.REEF.Utilities.Logging;
-
-namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
-{
-    /// <summary>
-    /// IMRU program that performs broadcast and reduce with fault tolerance.
-    /// </summary>
-    public sealed class FaultTolerantPipelinedBroadcastAndReduce : 
PipelinedBroadcastAndReduce
-    {
-        private static readonly Logger Logger = 
Logger.GetLogger(typeof(FaultTolerantPipelinedBroadcastAndReduce));
-
-        [Inject]
-        private FaultTolerantPipelinedBroadcastAndReduce(IIMRUClient 
imruClient) : base(imruClient)
-        {
-        }
-
-        /// <summary>
-        /// Runs the actual broadcast and reduce job with fault tolerance
-        /// </summary>
-        internal void Run(int numberofMappers, int chunkSize, int 
numIterations, int dim, int mapperMemory, int updateTaskMemory, int 
maxRetryNumberInRecovery, int totalNumberOfForcedFailures)
-        {
-            var results = _imruClient.Submit<int[], int[], int[], Stream>(
-                CreateJobDefinitionBuilder(numberofMappers, chunkSize, 
numIterations, dim, mapperMemory, updateTaskMemory)
-                    
.SetMapFunctionConfiguration(BuildMapperFunctionConfig(maxRetryNumberInRecovery,
 totalNumberOfForcedFailures))
-                    .SetMaxRetryNumberInRecovery(maxRetryNumberInRecovery)
-                    .Build());
-        }
-
-        /// <summary>
-        /// Build a test mapper function configuration
-        /// </summary>
-        /// <param name="maxRetryInRecovery">Number of retries done if first 
run failed.</param>
-        /// <param name="totalNumberOfForcedFailures">Number of forced failure 
times in recovery.</param>
-        private static IConfiguration BuildMapperFunctionConfig(int 
maxRetryInRecovery, int totalNumberOfForcedFailures)
-        {
-            var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
-                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
-                    GenericType<TestSenderMapFunction>.Class)
-                .Build();
-
-            var c2 = TangFactory.GetTang().NewConfigurationBuilder()
-                .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
-                .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
-                
.BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString())
-                .BindNamedParameter(typeof(MaxRetryNumberInRecovery), 
maxRetryInRecovery.ToString())
-                .BindNamedParameter(typeof(TotalNumberOfForcedFailures), 
totalNumberOfForcedFailures.ToString())
-                .Build();
-
-            return Configurations.Merge(c1, c2);
-        }
-
-        [NamedParameter(Documentation = "Set of task ids which will produce 
task/evaluator failure")]
-        internal class TaskIdsToFail : Name<ISet<string>>
-        {
-        }
-
-        [NamedParameter(Documentation = "Type of failure to simulate")]
-        internal class FailureType : Name<int>
-        {
-            internal const int EvaluatorFailureDuringTaskExecution = 0;
-            internal const int TaskFailureDuringTaskExecution = 1;
-            internal const int EvaluatorFailureDuringTaskInitialization = 2;
-            internal const int TaskFailureDuringTaskInitialization = 3;
-            internal const int EvaluatorFailureDuringTaskDispose = 4;
-            internal const int TaskFailureDuringTaskDispose = 5;
-
-            internal static bool IsEvaluatorFailure(int failureType)
-            {
-                return failureType == EvaluatorFailureDuringTaskExecution ||
-                       failureType == EvaluatorFailureDuringTaskInitialization 
||
-                       failureType == EvaluatorFailureDuringTaskDispose;
-            }
-        }
-
-        [NamedParameter(Documentation = "Total number of failures in 
recovery.", DefaultValue = "2")]
-        internal class TotalNumberOfForcedFailures : Name<int>
-        {
-        }
-
-        /// <summary>
-        /// The function is to simulate Evaluator/Task failure for mapper 
evaluator
-        /// </summary>
-        internal sealed class TestSenderMapFunction : IMapFunction<int[], 
int[]>, IDisposable
-        {
-            private int _iterations;
-            private readonly string _taskId;
-            private readonly ISet<string> _taskIdsToFail;
-            private readonly int _failureType;
-            private readonly int _maxRetryInRecovery;
-            private readonly int _totalNumberOfForcedFailures;
-            private readonly int _retryIndex;
-
-            [Inject]
-            private TestSenderMapFunction(
-                [Parameter(typeof(TaskConfigurationOptions.Identifier))] 
string taskId,
-                [Parameter(typeof(TaskIdsToFail))] ISet<string> taskIdsToFail,
-                [Parameter(typeof(FailureType))] int failureType,
-                [Parameter(typeof(MaxRetryNumberInRecovery))] int 
maxRetryNumberInRecovery,
-                [Parameter(typeof(TotalNumberOfForcedFailures))] int 
totalNumberOfForcedFailures)
-            {
-                _taskId = taskId;
-                _taskIdsToFail = taskIdsToFail;
-                _failureType = failureType;
-                _maxRetryInRecovery = maxRetryNumberInRecovery;
-                _totalNumberOfForcedFailures = totalNumberOfForcedFailures;
-
-                var taskIdSplit = taskId.Split('-');
-                _retryIndex = int.Parse(taskIdSplit[taskIdSplit.Length - 1]);
-
-                Logger.Log(Level.Info,
-                    "TestSenderMapFunction: TaskId: {0}, _maxRetryInRecovery 
{1}, totalNumberOfForcedFailures: {2}, RetryNumber: {3}, Failure type: {4}.",
-                    _taskId,
-                    _maxRetryInRecovery,
-                    _totalNumberOfForcedFailures,
-                    _retryIndex,
-                    _failureType);
-                foreach (var n in _taskIdsToFail)
-                {
-                    Logger.Log(Level.Info, "TestSenderMapFunction: 
taskIdsToFail: {0}", n);
-                }
-
-                if (_failureType == 
FailureType.EvaluatorFailureDuringTaskInitialization ||
-                    _failureType == 
FailureType.TaskFailureDuringTaskInitialization)
-                {
-                    SimulateFailure(0);
-                }
-            }
-
-            /// <summary>
-            /// Map function
-            /// </summary>
-            /// <param name="mapInput">integer array</param>
-            /// <returns>The same integer array</returns>
-            int[] IMapFunction<int[], int[]>.Map(int[] mapInput)
-            {
-                _iterations++;
-                Logger.Log(Level.Info, "Received value {0} in iteration {1}.", 
mapInput[0], _iterations);
-
-                if (_failureType == 
FailureType.EvaluatorFailureDuringTaskExecution ||
-                    _failureType == FailureType.TaskFailureDuringTaskExecution)
-                {
-                    SimulateFailure(10);
-                }
-
-                if (mapInput[0] != _iterations)
-                {
-                    Exceptions.Throw(
-                        new Exception("Expected value in mappers (" + 
_iterations + ") different from actual value (" +
-                                      mapInput[0] + ")"),
-                        Logger);
-                }
-
-                return mapInput;
-            }
-
-            public void Dispose()
-            {
-                if (_failureType == 
FailureType.EvaluatorFailureDuringTaskDispose ||
-                    _failureType == FailureType.TaskFailureDuringTaskDispose)
-                {
-                    SimulateFailure(_iterations);
-                }
-            }
-
-            private void SimulateFailure(int onIteration)
-            {
-                if (_iterations == onIteration &&
-                    _taskIdsToFail.FirstOrDefault(e => _taskId.StartsWith(e)) 
!= null &&
-                    _taskIdsToFail.FirstOrDefault(e => _taskId.Equals(e + 
_maxRetryInRecovery)) == null &&
-                    _retryIndex < _totalNumberOfForcedFailures)
-                {
-                    Logger.Log(Level.Warning,
-                        "Simulating {0} failure for taskId {1}",
-                        FailureType.IsEvaluatorFailure(_failureType) ? 
"evaluator" : "task",
-                        _taskId);
-                    if (FailureType.IsEvaluatorFailure(_failureType))
-                    {
-                        // simulate evaluator failure
-                        Environment.Exit(1);
-                    }
-                    else
-                    {
-                        // simulate task failure
-                        throw new ArgumentNullException("Simulating task 
failure");
-                    }
-                }
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/MapTaskState.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/MapTaskState.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/MapTaskState.cs
new file mode 100644
index 0000000..18d6ed0
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/MapTaskState.cs
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+    internal sealed class MapTaskState<TMapInput> : ITaskState
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(MapTaskState<TMapInput>));
+
+        /// <summary>
+        /// Current value of the map function
+        /// </summary>
+        internal TMapInput CurrentValue { get; set; }
+
+        /// <summary>
+        /// Simple implementation of the Map task state
+        /// </summary>
+        [Inject]
+        private MapTaskState()
+        {
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
index f061a44..47c01c3 100644
--- 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs
@@ -18,6 +18,7 @@
 using System.Globalization;
 using System.IO;
 using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
 using Org.Apache.REEF.IO.PartitionedData.Random;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -51,7 +52,7 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
                     .Build());
         }
 
-        protected IMRUJobDefinitionBuilder CreateJobDefinitionBuilder(int 
numberofMappers, int chunkSize, int numIterations, int dim, int mapperMemory, 
int updateTaskMemory)
+        protected virtual IMRUJobDefinitionBuilder 
CreateJobDefinitionBuilder(int numberofMappers, int chunkSize, int 
numIterations, int dim, int mapperMemory, int updateTaskMemory)
         {
             return new IMRUJobDefinitionBuilder()
                     
.SetUpdateFunctionConfiguration(UpdateFunctionConfig(numberofMappers, 
numIterations, dim))
@@ -70,7 +71,7 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
         /// <summary>
         /// Configuration for Partitioned Dataset
         /// </summary>
-        protected static IConfiguration PartitionedDatasetConfiguration(int 
numberofMappers)
+        protected virtual IConfiguration PartitionedDatasetConfiguration(int 
numberofMappers)
         {
             return 
RandomInputDataConfiguration.ConfigurationModule.Set(RandomInputDataConfiguration.NumberOfPartitions,
                 numberofMappers.ToString()).Build();
@@ -79,7 +80,7 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
         /// <summary>
         /// Configuration for Reduce Function
         /// </summary>
-        protected static IConfiguration ReduceFunctionConfiguration()
+        protected virtual IConfiguration ReduceFunctionConfiguration()
         {
             return IMRUReduceFunctionConfiguration<int[]>.ConfigurationModule
                 .Set(IMRUReduceFunctionConfiguration<int[]>.ReduceFunction,
@@ -90,7 +91,7 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
         /// <summary>
         /// Configuration for Update Function
         /// </summary>
-        protected static IConfiguration UpdateFunctionCodecsConfiguration()
+        protected virtual IConfiguration UpdateFunctionCodecsConfiguration()
         {
             return IMRUCodecConfiguration<int[]>.ConfigurationModule
                 .Set(IMRUCodecConfiguration<int[]>.Codec, 
GenericType<IntArrayStreamingCodec>.Class)
@@ -100,7 +101,7 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
         /// <summary>
         /// Configuration for Map Input Codec
         /// </summary>
-        protected static IConfiguration MapInputCodecConfiguration()
+        protected virtual IConfiguration MapInputCodecConfiguration()
         {
             return IMRUCodecConfiguration<int[]>.ConfigurationModule
                 .Set(IMRUCodecConfiguration<int[]>.Codec, 
GenericType<IntArrayStreamingCodec>.Class)
@@ -110,7 +111,7 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
         /// <summary>
         /// Configuration for Map Output Data Converter
         /// </summary>
-        protected static IConfiguration MapOutputDataConverterConfig(int 
chunkSize)
+        protected virtual IConfiguration MapOutputDataConverterConfig(int 
chunkSize)
         {
             var dataConverterConfig2 =
                 TangFactory.GetTang()
@@ -126,7 +127,7 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
         /// <summary>
         /// Configuration for Map Input Data Converter
         /// </summary>
-        protected static IConfiguration MapInputDataConverterConfig(int 
chunkSize)
+        protected virtual IConfiguration MapInputDataConverterConfig(int 
chunkSize)
         {
             var dataConverterConfig1 =
                 TangFactory.GetTang()
@@ -142,12 +143,10 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
         /// <summary>
         /// Configuration for Update Function
         /// </summary>
-        protected static IConfiguration UpdateFunctionConfig(int 
numberofMappers, int numIterations, int dim)
+        protected IConfiguration UpdateFunctionConfig(int numberofMappers, int 
numIterations, int dim)
         {
             var updateFunctionConfig =
-                
TangFactory.GetTang().NewConfigurationBuilder(IMRUUpdateConfiguration<int[], 
int[], int[]>.ConfigurationModule
-                    .Set(IMRUUpdateConfiguration<int[], int[], 
int[]>.UpdateFunction,
-                        
GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class).Build())
+                
TangFactory.GetTang().NewConfigurationBuilder(BuildUpdateFunctionConfig())
                     
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumberOfIterations),
                         numIterations.ToString(CultureInfo.InvariantCulture))
                     
.BindNamedParameter(typeof(BroadcastReduceConfiguration.Dimensions),
@@ -158,10 +157,17 @@ namespace 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
             return updateFunctionConfig;
         }
 
+        protected virtual IConfiguration BuildUpdateFunctionConfig()
+        {
+            return IMRUUpdateConfiguration<int[], int[], 
int[]>.ConfigurationModule
+                .Set(IMRUUpdateConfiguration<int[], int[], 
int[]>.UpdateFunction,
+                    
GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class).Build();
+        }
+
         /// <summary>
         /// Configuration for Mapper function
         /// </summary>
-        private static IConfiguration BuildMapperFunctionConfig()
+        protected virtual IConfiguration BuildMapperFunctionConfig()
         {
             return IMRUMapConfiguration<int[], int[]>.ConfigurationModule
                 .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduceWithFaultTolerant.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduceWithFaultTolerant.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduceWithFaultTolerant.cs
new file mode 100644
index 0000000..9ec3a69
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduceWithFaultTolerant.cs
@@ -0,0 +1,402 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.IO;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+    /// <summary>
+    /// IMRU program that performs broadcast and reduce with fault tolerance.
+    /// </summary>
+    public sealed class PipelinedBroadcastAndReduceWithFaultTolerant : 
PipelinedBroadcastAndReduce
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(PipelinedBroadcastAndReduceWithFaultTolerant));
+
+        [Inject]
+        private PipelinedBroadcastAndReduceWithFaultTolerant(IIMRUClient 
imruClient) : base(imruClient)
+        {
+        }
+
+        /// <summary>
+        /// Runs the actual broadcast and reduce job with fault tolerance
+        /// </summary>
+        internal void Run(int numberofMappers, int chunkSize, int 
numIterations, int dim, int mapperMemory, int updateTaskMemory, int 
maxRetryNumberInRecovery, int totalNumberOfForcedFailures)
+        {
+            var results = _imruClient.Submit<int[], int[], int[], Stream>(
+                CreateJobDefinitionBuilder(numberofMappers, chunkSize, 
numIterations, dim, mapperMemory, updateTaskMemory)
+                    
.SetMapFunctionConfiguration(BuildMapperFunctionConfig(maxRetryNumberInRecovery,
 totalNumberOfForcedFailures))
+                    .SetMaxRetryNumberInRecovery(maxRetryNumberInRecovery)
+                    .Build());
+        }
+
+        protected override IMRUJobDefinitionBuilder 
CreateJobDefinitionBuilder(int numberofMappers, int chunkSize, int 
numIterations, int dim, int mapperMemory, int updateTaskMemory)
+        {
+            return new IMRUJobDefinitionBuilder()
+                    
.SetUpdateTaskStateConfiguration(UpdateTaskStateConfiguration())
+                    .SetMapTaskStateConfiguration(MapTaskStateConfiguration())
+                    
.SetUpdateFunctionConfiguration(UpdateFunctionConfig(numberofMappers, 
numIterations, dim))
+                    
.SetMapInputCodecConfiguration(MapInputCodecConfiguration())
+                    
.SetUpdateFunctionCodecsConfiguration(UpdateFunctionCodecsConfiguration())
+                    
.SetReduceFunctionConfiguration(ReduceFunctionConfiguration())
+                    
.SetMapInputPipelineDataConverterConfiguration(MapInputDataConverterConfig(chunkSize))
+                    
.SetMapOutputPipelineDataConverterConfiguration(MapOutputDataConverterConfig(chunkSize))
+                    
.SetPartitionedDatasetConfiguration(PartitionedDatasetConfiguration(numberofMappers))
+                    .SetJobName("BroadcastReduce")
+                    .SetNumberOfMappers(numberofMappers)
+                    .SetMapperMemory(mapperMemory)
+                    .SetUpdateTaskMemory(updateTaskMemory);
+        }
+
+        /// <summary>
+        /// Build a test mapper function configuration
+        /// </summary>
+        /// <param name="maxRetryInRecovery">Number of retries done if first 
run failed.</param>
+        /// <param name="totalNumberOfForcedFailures">Number of forced failure 
times in recovery.</param>
+        private IConfiguration BuildMapperFunctionConfig(int 
maxRetryInRecovery, int totalNumberOfForcedFailures)
+        {
+            var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<SenderMapFunctionFT>.Class)
+                .Build();
+
+            var c2 = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+                .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+                
.BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString())
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery), 
maxRetryInRecovery.ToString())
+                .BindNamedParameter(typeof(TotalNumberOfForcedFailures), 
totalNumberOfForcedFailures.ToString())
+                .Build();
+
+            return Configurations.Merge(c1, c2);
+        }
+
+        /// <summary>
+        /// Configuration for Update Function
+        /// </summary>
+        protected override IConfiguration BuildUpdateFunctionConfig()
+        {
+            return IMRUUpdateConfiguration<int[], int[], 
int[]>.ConfigurationModule
+                .Set(IMRUUpdateConfiguration<int[], int[], 
int[]>.UpdateFunction,
+                    
GenericType<BroadcastSenderReduceReceiverUpdateFunctionFT>.Class).Build();
+        }
+
+        /// <summary>
+        /// Configuration for Update task state
+        /// </summary>
+        /// <returns></returns>
+        private IConfiguration UpdateTaskStateConfiguration()
+        {
+            return TangFactory.GetTang()
+                   .NewConfigurationBuilder()
+                   .BindImplementation(GenericType<ITaskState>.Class, 
GenericType<UpdateTaskState<int[], int[]>>.Class)
+                   .Build();
+        }
+
+        /// <summary>
+        /// Configuration for map task state
+        /// </summary>
+        /// <returns></returns>
+        private IConfiguration MapTaskStateConfiguration()
+        {
+            return TangFactory.GetTang()
+                   .NewConfigurationBuilder()
+                   .BindImplementation(GenericType<ITaskState>.Class, 
GenericType<MapTaskState<int[]>>.Class)
+                   .Build();
+        }
+
+        [NamedParameter(Documentation = "Set of task ids which will produce 
task/evaluator failure")]
+        internal class TaskIdsToFail : Name<ISet<string>>
+        {
+        }
+
+        [NamedParameter(Documentation = "Type of failure to simulate")]
+        internal class FailureType : Name<int>
+        {
+            internal const int EvaluatorFailureDuringTaskExecution = 0;
+            internal const int TaskFailureDuringTaskExecution = 1;
+            internal const int EvaluatorFailureDuringTaskInitialization = 2;
+            internal const int TaskFailureDuringTaskInitialization = 3;
+            internal const int EvaluatorFailureDuringTaskDispose = 4;
+            internal const int TaskFailureDuringTaskDispose = 5;
+
+            internal static bool IsEvaluatorFailure(int failureType)
+            {
+                return failureType == EvaluatorFailureDuringTaskExecution ||
+                       failureType == EvaluatorFailureDuringTaskInitialization 
||
+                       failureType == EvaluatorFailureDuringTaskDispose;
+            }
+        }
+
+        [NamedParameter(Documentation = "Total number of failures in 
recovery.", DefaultValue = "2")]
+        internal class TotalNumberOfForcedFailures : Name<int>
+        {
+        }
+
+        /// <summary>
+        /// The function is to simulate Evaluator/Task failure for mapper 
evaluator
+        /// </summary>
+        internal sealed class SenderMapFunctionFT : IMapFunction<int[], 
int[]>, IDisposable
+        {
+            private int _iterations;
+            private readonly string _taskId;
+            private readonly MapTaskState<int[]> _taskState;
+            private readonly ISet<string> _taskIdsToFail;
+            private readonly int _failureType;
+            private readonly int _maxRetryInRecovery;
+            private readonly int _totalNumberOfForcedFailures;
+            private readonly int _retryIndex;
+
+            [Inject]
+            private SenderMapFunctionFT(
+                [Parameter(typeof(TaskConfigurationOptions.Identifier))] 
string taskId,
+                [Parameter(typeof(TaskIdsToFail))] ISet<string> taskIdsToFail,
+                [Parameter(typeof(FailureType))] int failureType,
+                [Parameter(typeof(MaxRetryNumberInRecovery))] int 
maxRetryNumberInRecovery,
+                [Parameter(typeof(TotalNumberOfForcedFailures))] int 
totalNumberOfForcedFailures,                
+                ITaskState taskState)
+            {
+                _taskId = taskId;
+                _taskState = (MapTaskState<int[]>)taskState;
+                _taskIdsToFail = taskIdsToFail;
+                _failureType = failureType;
+                _maxRetryInRecovery = maxRetryNumberInRecovery;
+                _totalNumberOfForcedFailures = totalNumberOfForcedFailures;
+
+                var taskIdSplit = taskId.Split('-');
+                _retryIndex = int.Parse(taskIdSplit[taskIdSplit.Length - 1]);
+
+                Logger.Log(Level.Info,
+                    "TestSenderMapFunction: TaskId: {0}, _maxRetryInRecovery 
{1}, totalNumberOfForcedFailures: {2}, RetryNumber: {3}, Failure type: {4}.",
+                    _taskId,
+                    _maxRetryInRecovery,
+                    _totalNumberOfForcedFailures,
+                    _retryIndex,
+                    _failureType);
+                foreach (var n in _taskIdsToFail)
+                {
+                    Logger.Log(Level.Info, "TestSenderMapFunction: 
taskIdsToFail: {0}", n);
+                }
+
+                if (_failureType == 
FailureType.EvaluatorFailureDuringTaskInitialization ||
+                    _failureType == 
FailureType.TaskFailureDuringTaskInitialization)
+                {
+                    SimulateFailure(0);
+                }
+            }
+
+            /// <summary>
+            /// Map function
+            /// It simply takes the input, does a verification for the task 
state and returns the same input value. 
+            /// </summary>
+            /// <param name="mapInput">integer array</param>
+            /// <returns>The same integer array</returns>
+            int[] IMapFunction<int[], int[]>.Map(int[] mapInput)
+            {
+                int[] previousValue = _taskState.CurrentValue;
+
+                // In this example, when task is re-started, with the task 
state management, it should continue from the previous number
+                if (previousValue != null && previousValue[0] > mapInput[0])
+                {
+                    throw new 
Exception(string.Format(CultureInfo.CurrentCulture,
+                        "The previous value received was {0} but the value 
received is {1} which is smaller than the previous value.",
+                        previousValue[0],
+                        mapInput[0]));
+                }
+
+                _taskState.CurrentValue = mapInput;
+
+                _iterations++;
+                Logger.Log(Level.Info, "Received value {0} in iteration {1}.", 
mapInput[0], _iterations);
+
+                if (_failureType == 
FailureType.EvaluatorFailureDuringTaskExecution ||
+                    _failureType == FailureType.TaskFailureDuringTaskExecution)
+                {
+                    SimulateFailure(10);
+                }
+
+                return mapInput;
+            }
+
+            public void Dispose()
+            {
+                if (_failureType == 
FailureType.EvaluatorFailureDuringTaskDispose ||
+                    _failureType == FailureType.TaskFailureDuringTaskDispose)
+                {
+                    SimulateFailure(_iterations);
+                }
+            }
+
+            private void SimulateFailure(int onIteration)
+            {
+                if (_iterations == onIteration &&
+                    _taskIdsToFail.FirstOrDefault(e => _taskId.StartsWith(e)) 
!= null &&
+                    _taskIdsToFail.FirstOrDefault(e => _taskId.Equals(e + 
_maxRetryInRecovery)) == null &&
+                    _retryIndex < _totalNumberOfForcedFailures)
+                {
+                    Logger.Log(Level.Warning,
+                        "Simulating {0} failure for taskId {1}",
+                        FailureType.IsEvaluatorFailure(_failureType) ? 
"evaluator" : "task",
+                        _taskId);
+                    if (FailureType.IsEvaluatorFailure(_failureType))
+                    {
+                        // simulate evaluator failure
+                        Environment.Exit(1);
+                    }
+                    else
+                    {
+                        // simulate task failure
+                        throw new ArgumentNullException("Simulating task 
failure");
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// The Update function for integer array broadcast and reduce
+        /// </summary>
+        internal sealed class BroadcastSenderReduceReceiverUpdateFunctionFT : 
IUpdateFunction<int[], int[], int[]>
+        {
+            private int _iterations;
+            private readonly int _maxIters;
+            private readonly int _dim;
+            private readonly int[] _intArr;
+            private readonly int _workers;
+            private readonly UpdateTaskState<int[], int[]> _taskState;
+
+            [Inject]
+            private BroadcastSenderReduceReceiverUpdateFunctionFT(
+                
[Parameter(typeof(BroadcastReduceConfiguration.NumberOfIterations))] int 
maxIters,
+                [Parameter(typeof(BroadcastReduceConfiguration.Dimensions))] 
int dim,
+                [Parameter(typeof(BroadcastReduceConfiguration.NumWorkers))] 
int numWorkers,
+                ITaskState taskState)
+            {
+                _maxIters = maxIters;
+                _iterations = 0;
+                _dim = dim;
+                _intArr = new int[_dim];
+                _workers = numWorkers;
+                _taskState = (UpdateTaskState<int[], int[]>)taskState;
+            }
+
+            /// <summary>
+            /// Update function
+            /// </summary>
+            /// <param name="input">Input containing sum of all mappers 
arrays</param>
+            /// <returns>The Update Result</returns>
+            UpdateResult<int[], int[]> IUpdateFunction<int[], int[], 
int[]>.Update(int[] input)
+            {
+                Logger.Log(Level.Info, "Received value {0} in iterations 
{1}.", input[0], _iterations + 1);
+
+                if (input[0] != (_iterations + 1) * _workers)
+                {
+                    throw new Exception("Expected input to update function not 
same as actual input");
+                }
+
+                _iterations++;
+
+                if (_iterations < _maxIters)
+                {
+                    for (int i = 0; i < _dim; i++)
+                    {
+                        _intArr[i] = _iterations + 1;
+                    }
+
+                    SaveState(_intArr);
+                    return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+                }
+                SaveResult(input);
+                return UpdateResult<int[], int[]>.Done(input);
+            }
+
+            /// <summary>
+            /// Initialize function. Sends integer array with value 1 to all 
mappers first time.
+            /// In recovery case, restore the state and iterations from task 
state.
+            /// </summary>
+            /// <returns>Map input</returns>
+            UpdateResult<int[], int[]> IUpdateFunction<int[], int[], 
int[]>.Initialize()
+            {
+                if (_taskState.Result != null)
+                {
+                    Restore(_taskState.Result);
+                    _iterations = _taskState.Iterations;
+                    return UpdateResult<int[], int[]>.Done(_intArr);
+                }
+
+                if (_taskState.Input != null)
+                {
+                    Restore(_taskState.Input);
+                    _iterations = _taskState.Iterations;
+                    return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+                }
+
+                for (int i = 0; i < _dim; i++)
+                {
+                    _intArr[i] = 1;
+                }
+                return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+            }
+
+            /// <summary>
+            /// Save the current value to Task State
+            /// </summary>
+            /// <param name="value"></param>
+            private void SaveState(int[] value)
+            {
+                _taskState.Iterations = _iterations;
+                _taskState.Input = value;
+                Logger.Log(Level.Info, "State saved: {0}", 
_taskState.Input[0]);
+            }
+
+            /// <summary>
+            /// Save the result to Task State
+            /// </summary>
+            /// <param name="value"></param>
+            private void SaveResult(int[] value)
+            {
+                _taskState.Iterations = _iterations;
+                _taskState.Result = value;
+                Logger.Log(Level.Info, "Result saved: {0}", 
_taskState.Result[0]);
+            }
+
+            /// <summary>
+            /// Restore the data back to _intArr
+            /// </summary>
+            /// <param name="d"></param>
+            private void Restore(int[] d)
+            {
+                for (int i = 0; i < _dim; i++)
+                {
+                    _intArr[i] = d[i];
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/UpdateTaskState.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/UpdateTaskState.cs
 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/UpdateTaskState.cs
new file mode 100644
index 0000000..ed0c3ba
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/UpdateTaskState.cs
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce
+{
+    /// <summary>
+    /// Sample implementation of ITaskState that holds update task state 
+    /// </summary>
+    /// <typeparam name="TMapInput"></typeparam>
+    /// <typeparam name="TResult"></typeparam>
+    internal sealed class UpdateTaskState<TMapInput, TResult> : ITaskState
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(UpdateTaskState<TMapInput, TResult>));
+
+        internal TMapInput Input { get; set; }
+
+        internal TResult Result { get; set; }
+
+        /// <summary>
+        /// Keep the current iteration number
+        /// </summary>
+        internal int Iterations { get;  set; }
+
+        /// <summary>
+        /// Simple constructor for UpdateTaskState
+        /// </summary>
+        [Inject]
+        private UpdateTaskState()
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
index d8b7bb8..746e5e2 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Run.cs
@@ -119,7 +119,7 @@ namespace Org.Apache.REEF.IMRU.Examples
 
             if (faultTolerant)
             {
-                var broadcastReduceFtExample = 
injector.GetInstance<FaultTolerantPipelinedBroadcastAndReduce>();
+                var broadcastReduceFtExample = 
injector.GetInstance<PipelinedBroadcastAndReduceWithFaultTolerant>();
                 broadcastReduceFtExample.Run(numNodes - 1, chunkSize, 
iterations, dims, mapperMemory, updateTaskMemory, maxRetryNumberInRecovery, 
totalNumberOfForcedFailures);
             }
             else

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
index d42bf9b..ffec4a6 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
@@ -27,6 +27,8 @@ namespace Org.Apache.REEF.IMRU.API
     public sealed class IMRUJobDefinition
     {
         private readonly string _jobName;
+        private readonly IConfiguration _updateTaskStateConfiguration;
+        private readonly IConfiguration _mapTaskStateConfiguration;
         private readonly IConfiguration _mapFunctionConfiguration;
         private readonly IConfiguration _mapInputCodecConfiguration;
         private readonly IConfiguration _updateFunctionCodecsConfiguration;
@@ -48,6 +50,8 @@ namespace Org.Apache.REEF.IMRU.API
         /// <summary>
         /// Constructor
         /// </summary>
+        /// <param name="updateTaskStateConfiguration">Update task state 
configuration</param>
+        /// <param name="mapTaskStateConfiguration">Map task state 
configuration</param>
         /// <param name="mapFunctionConfiguration">Map function 
configuration</param>
         /// <param name="mapInputCodecConfiguration">Map input codec 
configuration</param>
         /// <param name="updateFunctionCodecsConfiguration">codec 
configuration for update 
@@ -71,6 +75,8 @@ namespace Org.Apache.REEF.IMRU.API
         /// <param name="jobName">Job name</param>
         /// <param name="invokeGC">Whether to call garbage collector after 
each iteration</param>
         internal IMRUJobDefinition(
+            IConfiguration updateTaskStateConfiguration,
+            IConfiguration mapTaskStateConfiguration,
             IConfiguration mapFunctionConfiguration,
             IConfiguration mapInputCodecConfiguration,
             IConfiguration updateFunctionCodecsConfiguration,
@@ -90,6 +96,8 @@ namespace Org.Apache.REEF.IMRU.API
             string jobName,
             bool invokeGC)
         {
+            _updateTaskStateConfiguration = updateTaskStateConfiguration;
+            _mapTaskStateConfiguration = mapTaskStateConfiguration;
             _mapFunctionConfiguration = mapFunctionConfiguration;
             _mapInputCodecConfiguration = mapInputCodecConfiguration;
             _updateFunctionCodecsConfiguration = 
updateFunctionCodecsConfiguration;
@@ -119,6 +127,22 @@ namespace Org.Apache.REEF.IMRU.API
         }
 
         /// <summary>
+        /// Configuration of update task state
+        /// </summary>
+        internal IConfiguration UpdateTaskStateConfiguration
+        {
+            get { return _updateTaskStateConfiguration; }
+        }
+
+        /// <summary>
+        /// Configuration of map task state
+        /// </summary>
+        internal IConfiguration MapTaskStateConfiguration
+        {
+            get { return _mapTaskStateConfiguration; }
+        }
+
+        /// <summary>
         /// Configuration of map function
         /// </summary>
         internal IConfiguration MapFunctionConfiguration

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs 
b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
index 5d56fde..e1af1a3 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
@@ -39,6 +39,8 @@ namespace Org.Apache.REEF.IMRU.API
         private int _coresPerMapper;
         private int _updateTaskCores;
         private int _maxRetryNumberInRecovery;
+        private IConfiguration _updateTaskStateConfiguration;
+        private IConfiguration _mapTaskStateConfiguration;
         private IConfiguration _mapFunctionConfiguration;
         private IConfiguration _mapInputCodecConfiguration;
         private IConfiguration _updateFunctionCodecsConfiguration;
@@ -59,6 +61,8 @@ namespace Org.Apache.REEF.IMRU.API
         /// </summary>
         public IMRUJobDefinitionBuilder()
         {
+            _updateTaskStateConfiguration = EmptyConfiguration;
+            _mapTaskStateConfiguration = EmptyConfiguration;
             _mapInputPipelineDataConverterConfiguration = EmptyConfiguration;
             _mapOutputPipelineDataConverterConfiguration = EmptyConfiguration;
             _partitionedDatasetConfiguration = EmptyConfiguration;
@@ -84,6 +88,28 @@ namespace Org.Apache.REEF.IMRU.API
         }
 
         /// <summary>
+        /// Sets configuration of map task state
+        /// </summary>
+        /// <param name="mapTaskStateConfiguration">Configuration for map task 
state</param>
+        /// <returns>this</returns>
+        public IMRUJobDefinitionBuilder 
SetMapTaskStateConfiguration(IConfiguration mapTaskStateConfiguration)
+        {
+            _mapTaskStateConfiguration = mapTaskStateConfiguration;
+            return this;
+        }
+
+        /// <summary>
+        /// Sets configuration of update task state
+        /// </summary>
+        /// <param name="updateTaskStateConfiguration">Configuration for 
update task state</param>
+        /// <returns>this</returns>
+        public IMRUJobDefinitionBuilder 
SetUpdateTaskStateConfiguration(IConfiguration updateTaskStateConfiguration)
+        {
+            _updateTaskStateConfiguration = updateTaskStateConfiguration;
+            return this;
+        }
+
+        /// <summary>
         /// Sets configuration of map function
         /// </summary>
         /// <param name="mapFunctionConfiguration">Configuration</param>
@@ -318,6 +344,8 @@ namespace Org.Apache.REEF.IMRU.API
             }
 
             return new IMRUJobDefinition(
+                _updateTaskStateConfiguration,
+                _mapTaskStateConfiguration,
                 _mapFunctionConfiguration,
                 _mapInputCodecConfiguration,
                 _updateFunctionCodecsConfiguration,

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index 223e033..8ba132e 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -37,6 +37,7 @@ using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
+using TraceLevel = System.Diagnostics.TraceLevel;
 
 namespace Org.Apache.REEF.IMRU.OnREEF.Client
 {
@@ -122,6 +123,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client
                 jobDefinition.PartitionedDatasetConfiguration,
                 overallPerMapConfig
             })
+                
.BindNamedParameter(typeof(SerializedUpdateTaskStateConfiguration),
+                    
_configurationSerializer.ToString(jobDefinition.UpdateTaskStateConfiguration))
+                
.BindNamedParameter(typeof(SerializedMapTaskStateConfiguration),
+                    
_configurationSerializer.ToString(jobDefinition.MapTaskStateConfiguration))
                 .BindNamedParameter(typeof(SerializedMapConfiguration),
                     
_configurationSerializer.ToString(jobDefinition.MapFunctionConfiguration))
                 .BindNamedParameter(typeof(SerializedUpdateConfiguration),

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
index b9d0542..192cdc4 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ConfigurationManager.cs
@@ -32,6 +32,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(ConfigurationManager));
 
+        private readonly IConfiguration _updateTaskStateConfiguration;
+        private readonly IConfiguration _mapTaskStateConfiguration;
         private readonly IConfiguration _mapFunctionConfiguration;
         private readonly IConfiguration _mapInputCodecConfiguration;
         private readonly IConfiguration _updateFunctionCodecsConfiguration;
@@ -44,6 +46,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         [Inject]
         private ConfigurationManager(
             AvroConfigurationSerializer configurationSerializer,
+            [Parameter(typeof(SerializedUpdateTaskStateConfiguration))] string 
updateTaskStateConfig,
+            [Parameter(typeof(SerializedMapTaskStateConfiguration))] string 
mapTaskStateConfig,
             [Parameter(typeof(SerializedMapConfiguration))] string mapConfig,
             [Parameter(typeof(SerializedReduceConfiguration))] string 
reduceConfig,
             [Parameter(typeof(SerializedUpdateConfiguration))] string 
updateConfig,
@@ -55,6 +59,26 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         {
             try
             {
+                _updateTaskStateConfiguration = 
configurationSerializer.FromString(updateTaskStateConfig);
+            }
+            catch (Exception)
+            {
+                Logger.Log(Level.Error, "Unable to deserialize update task 
state configuration");
+                throw;
+            }
+
+            try
+            {
+                _mapTaskStateConfiguration = 
configurationSerializer.FromString(mapTaskStateConfig);
+            }
+            catch (Exception)
+            {
+                Logger.Log(Level.Error, "Unable to deserialize map task state 
configuration");
+                throw;
+            }
+
+            try
+            {
                 _mapFunctionConfiguration = 
configurationSerializer.FromString(mapConfig);
             }
             catch (Exception e)
@@ -131,6 +155,22 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
+        /// Configuration of update task state
+        /// </summary>
+        internal IConfiguration UpdateTaskStateConfiguration
+        {
+            get { return _updateTaskStateConfiguration; }
+        }
+
+        /// <summary>
+        /// Configuration of map task state
+        /// </summary>
+        internal IConfiguration MapTaskStateConfiguration
+        {
+            get { return _mapTaskStateConfiguration; }
+        }
+
+        /// <summary>
         /// Configuration of map function
         /// </summary>
         internal IConfiguration MapFunctionConfiguration

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 50424c7..a468e5e 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -159,7 +159,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 
             _systemState = new SystemStateMachine();
             _serviceAndContextConfigurationProvider =
-                new ServiceAndContextConfigurationProvider<TMapInput, 
TMapOutput, TPartitionType>(dataSet);
+                new ServiceAndContextConfigurationProvider<TMapInput, 
TMapOutput, TPartitionType>(dataSet, configurationManager);
 
             var msg =
                 string.Format(CultureInfo.InvariantCulture, "map task 
memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3}, 
maxRetry {4}, allowedFailedEvaluators {5}.",
@@ -793,7 +793,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
-        /// Generates map task configuration given the active context. S
+        /// Generates map task configuration given the active context.
         /// Merge configurations of all the inputs to the MapTaskHost.
         /// </summary>
         /// <param name="activeContext">Active context to which task needs to 
be submitted</param>

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
index 24a2b9b..0d4e27b 100644
--- 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs
@@ -22,6 +22,7 @@ using Org.Apache.REEF.Common.Context;
 using Org.Apache.REEF.Common.Events;
 using Org.Apache.REEF.Common.Services;
 using Org.Apache.REEF.IO.PartitionedData;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Attributes;
@@ -44,18 +45,21 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         private readonly Dictionary<string, string> _partitionIdProvider = new 
Dictionary<string, string>();
         private readonly Stack<string> _partitionDescriptorIds = new 
Stack<string>();
         private readonly IPartitionedInputDataSet _dataset;
+        private readonly ConfigurationManager _configurationManager;
 
         /// <summary>
         /// Constructs the object witch maintains partitionDescriptor Ids so 
that to provide proper data load configuration 
         /// </summary>
-        /// <param name="dataset"></param>
-        internal 
ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset)
+        /// <param name="dataset">partition input dataset</param>
+        /// <param name="configurationManager">Configuration manager that 
holds configurations for context and tasks</param>
+        internal 
ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset, 
ConfigurationManager configurationManager)
         {
             _dataset = dataset;
             foreach (var descriptor in _dataset)
             {
                 _partitionDescriptorIds.Push(descriptor.Id);
             }
+            _configurationManager = configurationManager;
         }
 
         /// <summary>
@@ -83,10 +87,15 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         internal ContextAndServiceConfiguration 
GetContextConfigurationForMasterEvaluatorById(string evaluatorId)
         {
             Logger.Log(Level.Info, "Getting root context and service 
configuration for master");
+
+            var serviceConf = ServiceConfiguration.ConfigurationModule
+                .Set(ServiceConfiguration.Services, 
GenericType<TaskStateService>.Class)
+                .Build();
+
             return new ContextAndServiceConfiguration(
                 
ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier,
                     IMRUConstants.MasterContextId).Build(),
-                TangFactory.GetTang().NewConfigurationBuilder().Build());
+                Configurations.Merge(serviceConf, 
_configurationManager.UpdateTaskStateConfiguration));
         }
 
         /// <summary>
@@ -152,7 +161,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <param name="partitionDescriptor"></param>
         /// <param name="evaluatorId"></param>
         /// <returns></returns>
-        private static ContextAndServiceConfiguration 
GetDataLoadingContextAndServiceConfiguration(
+        private ContextAndServiceConfiguration 
GetDataLoadingContextAndServiceConfiguration(
             IPartitionDescriptor partitionDescriptor,
             string evaluatorId)
         {
@@ -166,9 +175,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 
             var serviceConf =
                 TangFactory.GetTang()
-                    
.NewConfigurationBuilder(ServiceConfiguration.ConfigurationModule.Build(),
+                    .NewConfigurationBuilder(
+                        ServiceConfiguration.ConfigurationModule
+                        .Set(ServiceConfiguration.Services, 
GenericType<TaskStateService>.Class)
+                        .Build(),
                         dataLoadingContextConf,
-                        partitionDescriptor.GetPartitionConfiguration())
+                        partitionDescriptor.GetPartitionConfiguration(),
+                        _configurationManager.MapTaskStateConfiguration)
                     .Build();
 
             var contextConf = ContextConfiguration.ConfigurationModule

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskStateService.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskStateService.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskStateService.cs
new file mode 100644
index 0000000..fcbf846
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskStateService.cs
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    internal sealed class TaskStateService
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(TaskStateService));
+
+        /// <summary>
+        /// The reference of ITaskState to hold the task sate
+        /// </summary>
+        private readonly ITaskState _state;
+
+        [Inject]
+        private TaskStateService(ITaskState state)
+        {
+            _state = state;
+        }
+
+        [Inject]
+        private TaskStateService()
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/ITaskState.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/ITaskState.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/ITaskState.cs
new file mode 100644
index 0000000..92eba07
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/ITaskState.cs
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
+{
+    /// <summary>
+    /// Interface for task state
+    /// Client can implement this interface and inject it into context service 
and task function to save the current task sate
+    /// </summary>
+    public interface ITaskState
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapTaskStateConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapTaskStateConfiguration.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapTaskStateConfiguration.cs
new file mode 100644
index 0000000..0954ccb
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedMapTaskStateConfiguration.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+    [NamedParameter("The serialized configuration for map task state.")]
+    internal sealed class SerializedMapTaskStateConfiguration : Name<string>
+    {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateTaskStateConfiguration.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateTaskStateConfiguration.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateTaskStateConfiguration.cs
new file mode 100644
index 0000000..159b244
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedUpdateTaskStateConfiguration.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation(ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+    [NamedParameter("The serialized configuration for map task state.")]
+    internal sealed class SerializedUpdateTaskStateConfiguration : Name<string>
+    {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index d89d5a9..f4da45f 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -90,9 +90,11 @@ under the License.
     <Compile 
Include="OnREEF\Driver\StateMachine\TaskStateTransitionException.cs" />
     <Compile Include="OnREEF\Driver\TaskInfo.cs" />
     <Compile Include="OnREEF\Driver\TaskManager.cs" />
+    <Compile Include="OnREEF\Driver\TaskStateService.cs" />
     <Compile Include="OnREEF\IMRUTasks\IMRUTaskAppException.cs" />
     <Compile Include="OnREEF\IMRUTasks\IMRUTaskGroupCommunicationException.cs" 
/>
     <Compile Include="OnREEF\IMRUTasks\IMRUTaskSystemException.cs" />
+    <Compile Include="OnREEF\IMRUTasks\ITaskState.cs" />
     <Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" />
     <Compile Include="OnREEF\IMRUTasks\TaskCloseCoordinator.cs" />
     <Compile Include="OnREEF\IMRUTasks\TaskHostBase.cs" />
@@ -108,6 +110,7 @@ under the License.
     <Compile Include="OnREEF\Parameters\MaxRetryNumberInRecovery.cs" />
     <Compile Include="OnREEF\Parameters\MemoryForUpdateTask.cs" />
     <Compile Include="OnREEF\Parameters\MemoryPerMapper.cs" />
+    <Compile 
Include="OnREEF\Parameters\SerializedMapTaskStateConfiguration.cs" />
     <Compile 
Include="OnREEF\Parameters\SerializedResultHandlerConfiguration.cs" />
     <Compile Include="OnREEF\Parameters\SerializedMapConfiguration.cs" />
     <Compile 
Include="OnREEF\Parameters\SerializedMapInputCodecConfiguration.cs" />
@@ -116,6 +119,7 @@ under the License.
     <Compile Include="OnREEF\Parameters\SerializedReduceConfiguration.cs" />
     <Compile Include="OnREEF\Parameters\SerializedUpdateConfiguration.cs" />
     <Compile 
Include="OnREEF\Parameters\SerializedUpdateFunctionCodecsConfiguration.cs" />
+    <Compile 
Include="OnREEF\Parameters\SerializedUpdateTaskStateConfiguration.cs" />
     <Compile Include="OnREEF\ResultHandler\DefaultResultHandler.cs" />
     <Compile Include="OnREEF\ResultHandler\ResultOutputLocation.cs" />
     <Compile Include="OnREEF\ResultHandler\WriteResultHandler.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
index f20ec31..8964836 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
@@ -48,7 +48,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
     public abstract class IMRUBrodcastReduceTestBase : ReefFunctionalTest
     {
         protected static readonly Logger Logger = 
Logger.GetLogger(typeof(IMRUBrodcastReduceTestBase));
-        private const string IMRUJobName = "IMRUBroadcastReduce";
+        protected const string IMRUJobName = "IMRUBroadcastReduce";
 
         protected const string CompletedTaskMessage = "CompletedTaskMessage";
         protected const string RunningTaskMessage = "RunningTaskMessage";
@@ -136,6 +136,10 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 jobDefinition.PartitionedDatasetConfiguration,
                 overallPerMapConfig
             })
+                
.BindNamedParameter(typeof(SerializedUpdateTaskStateConfiguration),
+                    
configurationSerializer.ToString(jobDefinition.UpdateTaskStateConfiguration))
+                
.BindNamedParameter(typeof(SerializedMapTaskStateConfiguration),
+                    
configurationSerializer.ToString(jobDefinition.MapTaskStateConfiguration))
                 .BindNamedParameter(typeof(SerializedMapConfiguration),
                     
configurationSerializer.ToString(jobDefinition.MapFunctionConfiguration))
                 .BindNamedParameter(typeof(SerializedUpdateConfiguration),
@@ -203,7 +207,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <param name="updateTaskMemory"></param>
         /// <param name="numberOfRetryInRecovery"></param>
         /// <returns></returns>
-        protected IMRUJobDefinition CreateIMRUJobDefinitionBuilder(int 
numberofMappers,
+        protected virtual IMRUJobDefinition CreateIMRUJobDefinitionBuilder(int 
numberofMappers,
             int chunkSize,
             int numIterations,
             int dim,
@@ -211,19 +215,9 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             int updateTaskMemory,
             int numberOfRetryInRecovery)
         {
-            var updateFunctionConfig =
-                
TangFactory.GetTang().NewConfigurationBuilder(BuildUpdateFunctionConfig())
-                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumberOfIterations),
-                        numIterations.ToString(CultureInfo.InvariantCulture))
-                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.Dimensions),
-                        dim.ToString(CultureInfo.InvariantCulture))
-                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumWorkers),
-                        numberofMappers.ToString(CultureInfo.InvariantCulture))
-                    .Build();
-
             return new IMRUJobDefinitionBuilder()
                 .SetMapFunctionConfiguration(BuildMapperFunctionConfig())
-                .SetUpdateFunctionConfiguration(updateFunctionConfig)
+                
.SetUpdateFunctionConfiguration(BuildUpdateFunctionConfiguration(numberofMappers,
 numIterations, dim))
                 .SetMapInputCodecConfiguration(BuildMapInputCodecConfig())
                 
.SetUpdateFunctionCodecsConfiguration(BuildUpdateFunctionCodecsConfig())
                 .SetReduceFunctionConfiguration(BuildReduceFunctionConfig())
@@ -239,6 +233,27 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         }
 
         /// <summary>
+        /// Build update function configuration. Subclass can override it.
+        /// </summary>
+        /// <param name="numberofMappers"></param>
+        /// <param name="numIterations"></param>
+        /// <param name="dim"></param>
+        /// <returns></returns>
+        protected virtual IConfiguration BuildUpdateFunctionConfiguration(int 
numberofMappers, int numIterations, int dim)
+        {
+            var updateFunctionConfig =
+                
TangFactory.GetTang().NewConfigurationBuilder(BuildUpdateFunctionConfigModule())
+                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumberOfIterations),
+                        numIterations.ToString(CultureInfo.InvariantCulture))
+                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.Dimensions),
+                        dim.ToString(CultureInfo.InvariantCulture))
+                    
.BindNamedParameter(typeof(BroadcastReduceConfiguration.NumWorkers),
+                        numberofMappers.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+            return updateFunctionConfig;
+        }
+
+        /// <summary>
         ///  Data Converter Configuration. Subclass can override it to have 
its own test Data Converter.
         /// </summary>
         /// <param name="chunkSize"></param>
@@ -267,10 +282,10 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         }
 
         /// <summary>
-        /// Update function configuration. Subclass can override it to have 
its own test function.
+        /// Set update function to IMRUUpdateConfiguration configuration 
module. Sub class can override it to set different function.
         /// </summary>
         /// <returns></returns>
-        protected virtual IConfiguration BuildUpdateFunctionConfig()
+        protected virtual IConfiguration BuildUpdateFunctionConfigModule()
         {
             return IMRUUpdateConfiguration<int[], int[], 
int[]>.ConfigurationModule
                 .Set(IMRUUpdateConfiguration<int[], int[], 
int[]>.UpdateFunction,

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
index 433fce1..225ae42 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
@@ -131,9 +131,9 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// Update function configuration. Add TcpConfiguration to the base 
configuration.
         /// </summary>
         /// <returns></returns>
-        protected override IConfiguration BuildUpdateFunctionConfig()
+        protected override IConfiguration BuildUpdateFunctionConfigModule()
         {
-            return Configurations.Merge(GetTcpConfiguration(), 
base.BuildUpdateFunctionConfig());
+            return Configurations.Merge(GetTcpConfiguration(), 
base.BuildUpdateFunctionConfigModule());
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/f5107f0e/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
index 29c3e25..1fb08ad 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
@@ -19,6 +19,7 @@ using System.Diagnostics;
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
 using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
 using Org.Apache.REEF.IMRU.OnREEF.Parameters;
 using Org.Apache.REEF.Network;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
@@ -35,8 +36,9 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         protected const int NumberOfRetry = 3;
 
         /// <summary>
-        /// This test fails two evaluators during task execution stage on each 
retry except last. 
+        /// This test fails two evaluators during task execution stage on each 
retry except last.
         /// Job is retried until success. 
+        /// In each retry, when the task is restarted on an existing 
evaluator, the iteration will continue from the previous task state.
         /// </summary>
         [Fact]
         public virtual void TestFailedMapperOnLocalRuntime()
@@ -124,6 +126,44 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         }
 
         /// <summary>
+        /// Create IMRU Job Definition with IMRU required configurations
+        /// </summary>
+        /// <param name="numberofMappers"></param>
+        /// <param name="chunkSize"></param>
+        /// <param name="numIterations"></param>
+        /// <param name="dim"></param>
+        /// <param name="mapperMemory"></param>
+        /// <param name="updateTaskMemory"></param>
+        /// <param name="numberOfRetryInRecovery"></param>
+        /// <returns></returns>
+        protected override IMRUJobDefinition 
CreateIMRUJobDefinitionBuilder(int numberofMappers,
+            int chunkSize,
+            int numIterations,
+            int dim,
+            int mapperMemory,
+            int updateTaskMemory,
+            int numberOfRetryInRecovery)
+        {
+            return new IMRUJobDefinitionBuilder()
+                
.SetUpdateTaskStateConfiguration(UpdateTaskStateConfiguration())
+                .SetMapTaskStateConfiguration(MapTaskStateConfiguration())
+                .SetMapFunctionConfiguration(BuildMapperFunctionConfig())
+                
.SetUpdateFunctionConfiguration(BuildUpdateFunctionConfiguration(numberofMappers,
 numIterations, dim))
+                .SetMapInputCodecConfiguration(BuildMapInputCodecConfig())
+                
.SetUpdateFunctionCodecsConfiguration(BuildUpdateFunctionCodecsConfig())
+                .SetReduceFunctionConfiguration(BuildReduceFunctionConfig())
+                
.SetMapInputPipelineDataConverterConfiguration(BuildDataConverterConfig(chunkSize))
+                
.SetMapOutputPipelineDataConverterConfiguration(BuildDataConverterConfig(chunkSize))
+                
.SetPartitionedDatasetConfiguration(BuildPartitionedDatasetConfiguration(numberofMappers))
+                .SetJobName(IMRUJobName)
+                .SetNumberOfMappers(numberofMappers)
+                .SetMapperMemory(mapperMemory)
+                .SetUpdateTaskMemory(updateTaskMemory)
+                .SetMaxRetryNumberInRecovery(numberOfRetryInRecovery)
+                .Build();
+        }
+
+        /// <summary>
         /// Mapper function configuration. Subclass can override it to have 
its own test function.
         /// </summary>
         /// <returns></returns>
@@ -131,29 +171,29 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         {
             var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
                 .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
-                    
GenericType<FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction>.Class)
                   
+                    
GenericType<PipelinedBroadcastAndReduceWithFaultTolerant.SenderMapFunctionFT>.Class)
                 .Build();
 
             var c2 = TangFactory.GetTang().NewConfigurationBuilder()
-                
.BindSetEntry<FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail, 
string>(GenericType<FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail>.Class,
 "IMRUMap-RandomInputPartition-2-")
-                
.BindSetEntry<FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail, 
string>(GenericType<FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail>.Class,
 "IMRUMap-RandomInputPartition-3-")
-                
.BindIntNamedParam<FaultTolerantPipelinedBroadcastAndReduce.FailureType>(FaultTolerantPipelinedBroadcastAndReduce.FailureType.EvaluatorFailureDuringTaskExecution.ToString())
+                
.BindSetEntry<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail, 
string>(GenericType<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail>.Class,
 "IMRUMap-RandomInputPartition-2-")
+                
.BindSetEntry<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail, 
string>(GenericType<PipelinedBroadcastAndReduceWithFaultTolerant.TaskIdsToFail>.Class,
 "IMRUMap-RandomInputPartition-3-")
+                
.BindIntNamedParam<PipelinedBroadcastAndReduceWithFaultTolerant.FailureType>(PipelinedBroadcastAndReduceWithFaultTolerant.FailureType.EvaluatorFailureDuringTaskExecution.ToString())
                 .BindNamedParameter(typeof(MaxRetryNumberInRecovery), 
NumberOfRetry.ToString())
-                
.BindNamedParameter(typeof(FaultTolerantPipelinedBroadcastAndReduce.TotalNumberOfForcedFailures),
 NumberOfRetry.ToString())
+                
.BindNamedParameter(typeof(PipelinedBroadcastAndReduceWithFaultTolerant.TotalNumberOfForcedFailures),
 NumberOfRetry.ToString())
                 .Build();
 
             return Configurations.Merge(c1, c2, GetTcpConfiguration());
         }
 
         /// <summary>
-        /// Update function configuration. Subclass can override it to have 
its own test function.
+        /// Set update function to IMRUUpdateConfiguration configuration 
module. Return it with TCP configuration.
         /// </summary>
         /// <returns></returns>
-        protected override IConfiguration BuildUpdateFunctionConfig()
+        protected override IConfiguration BuildUpdateFunctionConfigModule()
         {
             var c = IMRUUpdateConfiguration<int[], int[], 
int[]>.ConfigurationModule
                 .Set(IMRUUpdateConfiguration<int[], int[], 
int[]>.UpdateFunction,
-                    
GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class)
+                    
GenericType<PipelinedBroadcastAndReduceWithFaultTolerant.BroadcastSenderReduceReceiverUpdateFunctionFT>.Class)
                 .Build();
 
             return Configurations.Merge(c, GetTcpConfiguration());
@@ -170,5 +210,29 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 .Set(TcpClientConfigurationModule.SleepTime, "1000")
                 .Build();
         }
+
+        /// <summary>
+        /// Configuration for Map task state
+        /// </summary>
+        /// <returns></returns>
+        private IConfiguration MapTaskStateConfiguration()
+        {
+            return TangFactory.GetTang()
+                   .NewConfigurationBuilder()
+                   .BindImplementation(GenericType<ITaskState>.Class, 
GenericType<MapTaskState<int[]>>.Class)
+                   .Build();
+        }
+
+        /// <summary>
+        /// Configuration for Update task state
+        /// </summary>
+        /// <returns></returns>
+        private IConfiguration UpdateTaskStateConfiguration()
+        {
+            return TangFactory.GetTang()
+                   .NewConfigurationBuilder()
+                   .BindImplementation(GenericType<ITaskState>.Class, 
GenericType<UpdateTaskState<int[], int[]>>.Class)
+                   .Build();
+        }
     }
 }
\ No newline at end of file

Reply via email to