Repository: reef
Updated Branches:
  refs/heads/master d116d94e6 -> b14c8cd81


http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
index af4fbf1..01a2bdb 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
@@ -17,6 +17,8 @@
 
 using System;
 using System.IO;
+using System.Net.Sockets;
+using System.Runtime.Remoting;
 using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Common.Tasks.Events;
@@ -77,14 +79,17 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// <param name="resultHandler">Result handler</param>
         /// <param name="taskCloseCoordinator">Task close Coordinator</param>
         /// <param name="invokeGC">Whether to call Garbage Collector after 
each iteration or not</param>
+        /// <param name="taskId">task id</param>
         [Inject]
         private UpdateTaskHost(
             IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask,
             IGroupCommClient groupCommunicationsClient,
             IIMRUResultHandler<TResult> resultHandler,
             TaskCloseCoordinator taskCloseCoordinator,
-            [Parameter(typeof(InvokeGC))] bool invokeGC)
+            [Parameter(typeof(InvokeGC))] bool invokeGC,
+            [Parameter(typeof(TaskConfigurationOptions.Identifier))] string 
taskId)
         {
+            Logger.Log(Level.Info, "Entering constructor of UpdateTaskHost for 
task id {0}", taskId);
             _updateTask = updateTask;
             _groupCommunicationsClient = groupCommunicationsClient;
             var cg = 
groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);
@@ -95,6 +100,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             _resultHandler = resultHandler;
             _taskCloseCoordinator = taskCloseCoordinator;
             _cancellationSource = new CancellationTokenSource();
+            Logger.Log(Level.Info, "UpdateTaskHost initialized.");
         }
 
         /// <summary>
@@ -104,6 +110,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         /// <returns></returns>
         public byte[] Call(byte[] memento)
         {
+            Logger.Log(Level.Info, "Entering UpdateTaskHost Call().");
             var updateResult = _updateTask.Initialize();
             int iterNo = 0;
             try
@@ -148,27 +155,80 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
                     "Received OperationCanceledException in UpdateTaskHost 
with message: {0}.",
                     e.Message);
             }
-            catch (IOException e)
+            catch (Exception e)
             {
-                Logger.Log(Level.Error, "Received IOException in 
UpdateTaskHost with message: {0}.", e.Message);
-                if (!_cancellationSource.IsCancellationRequested)
+                if (e is IOException || e is TcpClientConnectionException || e 
is RemotingException ||
+                    e is SocketException)
+                {
+                    Logger.Log(Level.Error,
+                        "Received Exception {0} in UpdateTaskHost with 
message: {1}. The cancellation token is: {2}.",
+                        e.GetType(),
+                        e.Message,
+                        _cancellationSource.IsCancellationRequested);
+                    if (!_cancellationSource.IsCancellationRequested)
+                    {
+                        Logger.Log(Level.Error,
+                            "UpdateTaskHost is throwing 
IMRUTaskGroupCommunicationException with cancellation token: {0}.",
+                            _cancellationSource.IsCancellationRequested);
+                        throw new 
IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                    }
+                }
+                else if (e is AggregateException)
+                {
+                    Logger.Log(Level.Error,
+                        "Received AggregateException. The cancellation token 
is: {0}.",
+                        _cancellationSource.IsCancellationRequested);
+                    if (e.InnerException != null)
+                    {
+                        Logger.Log(Level.Error,
+                            "InnerException {0}, with message {1}.",
+                            e.InnerException.GetType(),
+                            e.InnerException.Message);
+                    }
+                    if (!_cancellationSource.IsCancellationRequested)
+                    {
+                        if (e.InnerException != null && e.InnerException is 
IOException)
+                        {
+                            Logger.Log(Level.Error,
+                                "UpdateTaskHost is throwing 
IMRUTaskGroupCommunicationException with cancellation token: {0}.",
+                                _cancellationSource.IsCancellationRequested);
+                            throw new 
IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                        }
+                        else
+                        {
+                            throw e;
+                        }
+                    }
+                }
+                else
                 {
-                    throw new 
IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                    Logger.Log(Level.Error,
+                       "UpdateTaskHost is throwing Excetion {0}, messge {1} 
with cancellation token: {2} and StackTrace {3}.",
+                       e.GetType(),
+                       e.Message,
+                       _cancellationSource.IsCancellationRequested,
+                       e.StackTrace);
+                    if (!_cancellationSource.IsCancellationRequested)
+                    {
+                        throw e;
+                    }
                 }
             }
-            catch (TcpClientConnectionException e)
+            finally
             {
-                Logger.Log(Level.Error,
-                    "Received TcpClientConnectionException in UpdateTaskHost 
with message: {0}.",
-                    e.Message);
-                if (!_cancellationSource.IsCancellationRequested)
+                try
+                {
+                    _resultHandler.Dispose();
+                }
+                catch (Exception e)
                 {
-                    throw new 
IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                    Logger.Log(Level.Error, "Exception in dispose result 
handler.", e);
+                    //// TODO throw proper exceptions JIRA REEF-1492
                 }
+                _taskCloseCoordinator.SignalTaskStopped();
+                Logger.Log(Level.Info, "UpdateTaskHost returned with 
cancellation token {0}.", _cancellationSource.IsCancellationRequested);
             }
-            _resultHandler.Dispose();
-            _taskCloseCoordinator.SignalTaskStopped();
-            Logger.Log(Level.Info, "UpdateTaskHost returned with cancellation 
token {0}.", _cancellationSource.IsCancellationRequested);
+
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
index 5a5b9c3..40816c1 100644
--- 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
+++ 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/AllowedFailedEvaluatorsFraction.cs
@@ -19,7 +19,7 @@ using Org.Apache.REEF.Tang.Annotations;
 
 namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
 {
-    [NamedParameter("Determines number of failed evaluators 
(AllowedFailedEvaluators * Number of mappers) tolerated before throwing 
exception", "failedevaluators", "2.0")]
+    [NamedParameter("Determines number of failed evaluators 
(AllowedFailedEvaluators * Number of mappers) tolerated before throwing 
exception", "failedevaluators", "0.5")]
     internal sealed class AllowedFailedEvaluatorsFraction : Name<double>
     {
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MaxRetryNumberInRecovery.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MaxRetryNumberInRecovery.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MaxRetryNumberInRecovery.cs
new file mode 100644
index 0000000..cc7a5b7
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/MaxRetryNumberInRecovery.cs
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+    /// <summary>
+    /// Max retry number for the system recovery
+    /// </summary>
+    [NamedParameter("Maximum retry number in fault tolerant recovery.", 
"maxRetryInRecovery", "3")]
+    public sealed class MaxRetryNumberInRecovery : Name<int>
+    {
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 30d110a..cd3603a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -104,6 +104,7 @@ under the License.
     <Compile Include="OnREEF\Parameters\AllowedFailedEvaluatorsFraction.cs" />
     <Compile Include="OnREEF\Parameters\CoresForUpdateTask.cs" />
     <Compile Include="OnREEF\Parameters\CoresPerMapper.cs" />
+    <Compile Include="OnREEF\Parameters\MaxRetryNumberInRecovery.cs" />
     <Compile Include="OnREEF\Parameters\MemoryForUpdateTask.cs" />
     <Compile Include="OnREEF\Parameters\MemoryPerMapper.cs" />
     <Compile 
Include="OnREEF\Parameters\SerializedResultHandlerConfiguration.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs
index acf640b..16dba10 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/IGroupCommDriver.cs
@@ -29,7 +29,7 @@ namespace Org.Apache.REEF.Network.Group.Driver
         /// <summary>
         /// Returns the identifier for the master task
         /// </summary>
-        string MasterTaskId { get; }
+        string MasterTaskId { get; set; }
 
         ICommunicationGroupDriver DefaultGroup { get; }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs 
b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
index e636b04..7fe797a 100644
--- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs
@@ -100,7 +100,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl
         /// <summary>
         /// Returns the identifier for the master task
         /// </summary>
-        public string MasterTaskId { get; private set; }
+        public string MasterTaskId { get; set; }
 
         public ICommunicationGroupDriver DefaultGroup
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs 
b/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs
index 6736e10..fc4a12b 100644
--- a/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs
+++ b/lang/cs/Org.Apache.REEF.Tang/Exceptions/IllegalStateException.cs
@@ -16,9 +16,11 @@
 // under the License.
 
 using System;
+using System.Runtime.Serialization;
 
 namespace Org.Apache.REEF.Tang.Exceptions
 {
+    [Serializable]
     public sealed class IllegalStateException : Exception
     {
         public IllegalStateException()
@@ -30,6 +32,11 @@ namespace Org.Apache.REEF.Tang.Exceptions
         {           
         }
 
+        public IllegalStateException(SerializationInfo info, StreamingContext 
context)
+            : base(info, context)
+        {
+        }
+
         public IllegalStateException(string message, Exception innerException)
             : base(message, innerException)
         {

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs 
b/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs
index 4fc991c..b9a8189 100644
--- a/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs
+++ b/lang/cs/Org.Apache.REEF.Tang/Exceptions/InjectionException.cs
@@ -16,19 +16,26 @@
 // under the License.
 
 using System;
+using System.Runtime.Serialization;
 
 namespace Org.Apache.REEF.Tang.Exceptions
 {
+    [Serializable]
     public sealed class InjectionException : Exception
     {
         internal InjectionException(string msg)
             : base(msg)
-        {           
+        {
         }
 
         internal InjectionException(string message, Exception innerException)
             : base(message, innerException)
         {
         }
+
+        public InjectionException(SerializationInfo info, StreamingContext 
context)
+            : base(info, context)
+        {
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
index 376ca7c..63126e8 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceTest.cs
@@ -28,31 +28,50 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(IMRUMapperCountTest));
 
         private static readonly int NumNodes = 4;
+        private static readonly int NumOfRetry = 2;
 
         [Fact]
         [Trait("Description", "Run IMRU broadcast and reduce example as 
test.")]
         void TestIMRUBroadcastReduceOnLocalRuntime()
         {
             string testFolder = DefaultRuntimeFolder + TestId;
-            TestIMRUBroadcastReduce(false, testFolder);
+            TestIMRUBroadcastReduce(false, false, testFolder);
             ValidateSuccessForLocalRuntime(NumNodes, testFolder: testFolder);
             CleanUp(testFolder);
         }
 
+        [Fact]
+        [Trait("Description", "Run IMRU broadcast and reduce example as 
test.")]
+        void TestIMRUBroadcastReduceWithFTOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestIMRUBroadcastReduce(false, true, testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 
240);
+            var completedTaskCount = GetMessageCount(lines, "Received 
ICompletedTask");
+            var runningTaskCount = GetMessageCount(lines, "Received 
IRunningTask");
+            var failedEvaluatorCount = GetMessageCount(lines, "Received 
IFailedEvaluator");
+            var failedTaskCount = GetMessageCount(lines, "Received 
IFailedTask");
+            Assert.Equal((NumOfRetry + 1) * NumNodes, completedTaskCount + 
failedEvaluatorCount + failedTaskCount);
+            Assert.Equal((NumOfRetry + 1) * NumNodes, runningTaskCount);
+            CleanUp(testFolder);
+        }
+
         [Fact(Skip = "Requires Yarn")]
         [Trait("Description", "Run IMRU broadcast and reduce example as test 
on Yarn.")]
         void TestIMRUBroadcastReduceOnYarn()
         {
-            TestIMRUBroadcastReduce(true);
+            TestIMRUBroadcastReduce(true, false);
         }
 
-        private void TestIMRUBroadcastReduce(bool runOnYarn, params string[] 
testFolder)
+        private void TestIMRUBroadcastReduce(bool runOnYarn, bool 
faultTolerant, params string[] testFolder)
         {
             var tcpPortConfig = TcpPortConfigurationModule.ConfigurationModule
                 .Set(TcpPortConfigurationModule.PortRangeStart, "8900")
                 .Set(TcpPortConfigurationModule.PortRangeCount, "1000")
                 .Build();
-            Run.RunBroadcastReduceTest(tcpPortConfig, runOnYarn, NumNodes, new 
string[0], testFolder);
+
+            string[] args = { "10", "2", "512", "512", "100", 
NumOfRetry.ToString() };
+            Run.RunBroadcastReduceTest(tcpPortConfig, runOnYarn, NumNodes, 
faultTolerant, args, testFolder);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
index e58e236..3bf712b 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithFilePartitionDataSetTest.cs
@@ -75,6 +75,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 iterations,
                 mapperMemory,
                 updateTaskMemory,
+                0,
                 testFolder);
             ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder, 100);
             CleanUp(testFolder);
@@ -90,11 +91,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             int iterations,
             int mapperMemory,
             int updateTaskMemory,
+            int numberOfRetryInRecovery = 0,
             string testFolder = DefaultRuntimeFolder)
         {
             string runPlatform = runOnYarn ? "yarn" : "local";
             TestRun(DriverConfiguration<int[], int[], int[], IEnumerable<Row>>(
-                CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, 
iterations, dims, mapperMemory, updateTaskMemory),
+                CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, 
iterations, dims, mapperMemory, updateTaskMemory, numberOfRetryInRecovery),
                 DriverEventHandlerConfigurations<int[], int[], int[], 
IEnumerable<Row>>()),
                 typeof(BroadcastReduceDriver),
                 numTasks,
@@ -121,6 +123,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, 
TraceLevel.Info.ToString())

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs
index aa287b1..2bacd5a 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBroadcastReduceWithLocalFile.cs
@@ -48,6 +48,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 iterations,
                 mapperMemory,
                 updateTaskMemory,
+                0,
                 testFolder);
             ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder, 100);
             CleanUp(testFolder);

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
index 2f4b109..f20ec31 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceTestBase.cs
@@ -16,10 +16,11 @@
 // under the License.
 
 using System;
-using System.Collections.Generic;
 using System.Globalization;
 using System.IO;
 using System.Linq;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.IMRU.API;
 using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
 using Org.Apache.REEF.IMRU.OnREEF.Driver;
@@ -29,6 +30,7 @@ using 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverA
 using Org.Apache.REEF.Network.Group.Config;
 using Org.Apache.REEF.Network.Group.Driver;
 using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -48,6 +50,11 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         protected static readonly Logger Logger = 
Logger.GetLogger(typeof(IMRUBrodcastReduceTestBase));
         private const string IMRUJobName = "IMRUBroadcastReduce";
 
+        protected const string CompletedTaskMessage = "CompletedTaskMessage";
+        protected const string RunningTaskMessage = "RunningTaskMessage";
+        protected const string FailedTaskMessage = "FailedTaskMessage";
+        protected const string FailedEvaluatorMessage = 
"FailedEvaluatorMessage";
+
         /// <summary>
         /// Abstract method for subclass to override it to provide 
configurations for driver handlers 
         /// </summary>
@@ -70,6 +77,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <param name="dims"></param>
         /// <param name="iterations"></param>
         /// <param name="mapperMemory"></param>
+        /// <param name="numberOfRetryInRecovery"></param>
         /// <param name="updateTaskMemory"></param>
         /// <param name="testFolder"></param>
         protected void TestBroadCastAndReduce(bool runOnYarn,
@@ -79,11 +87,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             int iterations,
             int mapperMemory,
             int updateTaskMemory,
+            int numberOfRetryInRecovery = 0,
             string testFolder = DefaultRuntimeFolder)
         {
             string runPlatform = runOnYarn ? "yarn" : "local";
             TestRun(DriverConfiguration<int[], int[], int[], Stream>(
-                CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, 
iterations, dims, mapperMemory, updateTaskMemory),
+                CreateIMRUJobDefinitionBuilder(numTasks - 1, chunkSize, 
iterations, dims, mapperMemory, updateTaskMemory, numberOfRetryInRecovery),
                 DriverEventHandlerConfigurations<int[], int[], int[], 
Stream>()),
                 typeof(BroadcastReduceDriver),
                 numTasks,
@@ -151,6 +160,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                     
jobDefinition.MapTaskCores.ToString(CultureInfo.InvariantCulture))
                 .BindNamedParameter(typeof(CoresForUpdateTask),
                     
jobDefinition.UpdateTaskCores.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery),
+                    
jobDefinition.MaxRetryNumberInRecovery.ToString(CultureInfo.InvariantCulture))
                 .BindNamedParameter(typeof(InvokeGC),
                     
jobDefinition.InvokeGarbageCollectorAfterIteration.ToString(CultureInfo.InvariantCulture))
                 .Build();
@@ -190,13 +201,15 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
         /// <param name="dim"></param>
         /// <param name="mapperMemory"></param>
         /// <param name="updateTaskMemory"></param>
+        /// <param name="numberOfRetryInRecovery"></param>
         /// <returns></returns>
         protected IMRUJobDefinition CreateIMRUJobDefinitionBuilder(int 
numberofMappers,
             int chunkSize,
             int numIterations,
             int dim,
             int mapperMemory,
-            int updateTaskMemory)
+            int updateTaskMemory,
+            int numberOfRetryInRecovery)
         {
             var updateFunctionConfig =
                 
TangFactory.GetTang().NewConfigurationBuilder(BuildUpdateFunctionConfig())
@@ -221,6 +234,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                 .SetNumberOfMappers(numberofMappers)
                 .SetMapperMemory(mapperMemory)
                 .SetUpdateTaskMemory(updateTaskMemory)
+                .SetMaxRetryNumberInRecovery(numberOfRetryInRecovery)
                 .Build();
         }
 
@@ -309,5 +323,50 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                     GenericType<IntArraySumReduceFunction>.Class)
                 .Build();
         }
+
+        /// <summary>
+        /// This class contains handlers for log purpose only
+        /// </summary>
+        protected sealed class MessageLogger :
+            IObserver<ICompletedTask>,
+            IObserver<IFailedEvaluator>,
+            IObserver<IFailedTask>,
+            IObserver<IRunningTask>
+        {
+            [Inject]
+            private MessageLogger()
+            {
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnNext(ICompletedTask value)
+            {
+                Logger.Log(Level.Info, CompletedTaskMessage + " " + value.Id + 
" " + value.ActiveContext.EvaluatorId);
+            }
+
+            public void OnNext(IFailedTask value)
+            {
+                Logger.Log(Level.Info, FailedTaskMessage + " " + value.Id + " 
" + value.GetActiveContext().Value.EvaluatorId);
+            }
+
+            public void OnNext(IFailedEvaluator value)
+            {
+                Logger.Log(Level.Info, FailedEvaluatorMessage + " " + value.Id 
+ " " + (value.FailedTask.IsPresent() ? value.FailedTask.Value.Id : "no task"));
+            }
+
+            public void OnNext(IRunningTask value)
+            {
+                Logger.Log(Level.Info, RunningTaskMessage + " " + value.Id + " 
" + value.ActiveContext.EvaluatorId);
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs
index c00f44d..eaa405f 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUBrodcastReduceWithoutIMRUClientTest.cs
@@ -38,8 +38,9 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             int mapperMemory = 5120;
             int updateTaskMemory = 5120;
             int numTasks = 4;
+            int numberOfRetryInRecovery = 4;
             string testFolder = DefaultRuntimeFolder + TestId;
-            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory, testFolder);
+            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory, numberOfRetryInRecovery, 
testFolder);
             ValidateSuccessForLocalRuntime(numTasks, 0, 0, testFolder);
             CleanUp(testFolder);
         }
@@ -56,7 +57,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             int mapperMemory = 5120;
             int updateTaskMemory = 5120;
             int numTasks = 4;
-            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory);
+            int numberOfRetryInRecovery = 4;
+            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory, numberOfRetryInRecovery);
         }
 
         /// <summary>
@@ -82,6 +84,8 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
                     GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
                      GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
                 .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, 
TraceLevel.Info.ToString())

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
index b462438..068aa50 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs
@@ -34,23 +34,19 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
 {
     /// <summary>
     /// This is to test close event handler in IMRU tasks
-    /// The test provide IRunningTask, IFailedTask and ICompletedTask handlers 
so that to trigger close events and handle the 
+    /// The test provide IRunningTask, IFailedTask and ICompletedTask handlers 
so that to trigger close events and handle the
     /// failed tasks and completed tasks
     /// </summary>
     [Collection("FunctionalTests")]
     public class IMRUCloseTaskTest : IMRUBrodcastReduceTestBase
     {
-        private const string CompletedTaskMessage = "CompletedTaskMessage";
-        private const string FailEvaluatorMessage = "FailEvaluatorMessage";
-        private const string FailTaskMessage = "FailTaskMessage";
-
         /// <summary>
         /// This test is for running in local runtime
         /// It sends close event for all the running tasks.
-        /// In the task close handler, the cancellation token will be set, and 
as a result tasks will return from the Call() 
+        /// In the task close handler, the cancellation token will be set, and 
as a result tasks will return from the Call()
         /// method and driver will receive ICompletedTask.
-        /// In the exceptional case, task might throw exception from Call() 
method, as a result, driver will receive IFailedTask. 
-        /// Expect number of CompletedTask and FailedTask equals to the total 
number of tasks. No failed Evaluator. 
+        /// In the exceptional case, task might throw exception from Call() 
method, as a result, driver will receive IFailedTask.
+        /// Expect number of CompletedTask and FailedTask equals to the total 
number of tasks. No failed Evaluator.
         /// </summary>
         [Fact]
         public void TestTaskCloseOnLocalRuntime()
@@ -61,11 +57,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             const int mapperMemory = 512;
             const int updateTaskMemory = 512;
             const int numTasks = 4;
+            const int numOfRetryInRecovery = 4;
             var testFolder = DefaultRuntimeFolder + TestId;
-            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory, testFolder);
+            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory, numOfRetryInRecovery, testFolder);
             string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 
120);
             var completedCount = GetMessageCount(lines, CompletedTaskMessage);
-            var failedCount = GetMessageCount(lines, FailTaskMessage);
+            var failedCount = GetMessageCount(lines, FailedTaskMessage);
             Assert.Equal(numTasks, completedCount + failedCount);
             CleanUp(testFolder);
         }
@@ -83,12 +80,13 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             const int mapperMemory = 512;
             const int updateTaskMemory = 512;
             const int numTasks = 4;
-            TestBroadCastAndReduce(true, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory);
+            const int numOfRetryInRecovery = 4;
+            TestBroadCastAndReduce(true, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory, numOfRetryInRecovery);
         }
 
         /// <summary>
         /// This method overrides base class method and defines its own event 
handlers for driver. 
-        /// It uses its own RunningTaskHandler, FailedEvaluatorHandler and 
CompletedTaskHandler, FailedTaskHandler so that to simulate the test scenarios 
+        /// It uses its own RunningTaskHandler, FailedEvaluatorHandler and 
CompletedTaskHandler, FailedTaskHandler so that to simulate the test scenarios
         /// and verify the test result. 
         /// Rest of the event handlers use those from IMRUDriver. In 
IActiveContext handler in IMRUDriver, IMRU tasks are bound for the test.
         /// </summary>
@@ -190,7 +188,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             {
                 lock (_lock)
                 {
-                    Logger.Log(Level.Info, FailTaskMessage + value.Id);
+                    Logger.Log(Level.Info, FailedTaskMessage + value.Id);
                     CloseRunningTasks();
                     value.GetActiveContext().Value.Dispose();
                 }
@@ -202,7 +200,7 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             /// <param name="value"></param>
             public void OnNext(IFailedEvaluator value)
             {
-                throw new Exception(FailEvaluatorMessage);
+                throw new Exception(FailedEvaluatorMessage);
             }
 
             /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
new file mode 100644
index 0000000..af02405
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluators.cs
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using TaskIdsToFail = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
+using FailureType = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using TestSenderMapFunction = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Network;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+using TraceLevel = System.Diagnostics.TraceLevel;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class TestFailMapperEvaluators : IMRUBrodcastReduceTestBase
+    {
+        protected const int NumberOfRetry = 3;
+
+        /// <summary>
+        /// This test is to fail one evaluator and then try to resubmit. In 
the last retry, 
+        /// there will be no failed evaluator and all tasks will be 
successfully completed. 
+        /// </summary>
+        [Fact]
+        public virtual void TestFailedMapperOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 9;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                NumberOfRetry,
+                testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 
240);
+            var completedTaskCount = GetMessageCount(lines, 
CompletedTaskMessage);
+            var runningTaskCount = GetMessageCount(lines, RunningTaskMessage);
+            var failedEvaluatorCount = GetMessageCount(lines, 
FailedEvaluatorMessage);
+            var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+
+            // on each try each task should fail or complete or disappear with 
failed evaluator
+            // and on each try all tasks should start successfully
+            Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + 
failedEvaluatorCount + failedTaskCount);
+            Assert.Equal((NumberOfRetry + 1) * numTasks, runningTaskCount);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This test is for the normal scenarios of IMRUDriver and IMRUTasks 
on yarn
+        /// </summary>
+        [Fact(Skip = "Requires Yarn")]
+        public virtual void TestFailedMapperOnYarn()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            TestBroadCastAndReduce(true, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory);
+        }
+
+        /// <summary>
+        /// This method defines event handlers for driver. As default, it uses 
all the handlers defined in IMRUDriver.
+        /// </summary>
+        /// <typeparam name="TMapInput"></typeparam>
+        /// <typeparam name="TMapOutput"></typeparam>
+        /// <typeparam name="TResult"></typeparam>
+        /// <typeparam name="TPartitionType"></typeparam>
+        /// <returns></returns>
+        protected override IConfiguration 
DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, 
TPartitionType>()
+        {
+            return REEF.Driver.DriverConfiguration.ConfigurationModule
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnDriverStarted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextActive,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, 
GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, 
GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning, 
GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed, 
GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, 
TraceLevel.Verbose.ToString())
+                .Build();
+        }
+
+        /// <summary>
+        /// Mapper function configuration. Subclass can override it to have 
its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildMapperFunctionConfig()
+        {
+            var c1 = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<TestSenderMapFunction>.Class)                  
 
+                .Build();
+
+            var c2 = TangFactory.GetTang().NewConfigurationBuilder()
+                .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+                .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+                
.BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString())
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery), 
NumberOfRetry.ToString())
+                .Build();
+
+            return Configurations.Merge(c1, c2, GetTcpConfiguration());
+        }
+
+        /// <summary>
+        /// Update function configuration. Subclass can override it to have 
its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildUpdateFunctionConfig()
+        {
+            var c = IMRUUpdateConfiguration<int[], int[], 
int[]>.ConfigurationModule
+                .Set(IMRUUpdateConfiguration<int[], int[], 
int[]>.UpdateFunction,
+                    
GenericType<BroadcastSenderReduceReceiverUpdateFunction>.Class)
+                .Build();
+
+            return Configurations.Merge(c, GetTcpConfiguration());
+        }
+
+        /// <summary>
+        /// Override default setting for retry policy
+        /// </summary>
+        /// <returns></returns>
+        private IConfiguration GetTcpConfiguration()
+        {
+            return TcpClientConfigurationModule.ConfigurationModule
+                .Set(TcpClientConfigurationModule.MaxConnectionRetry, "200")
+                .Set(TcpClientConfigurationModule.SleepTime, "1000")
+                .Build();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
new file mode 100644
index 0000000..6fecb2c
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperEvaluatorsOnInit.cs
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using TaskIdsToFail = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
+using FailureType = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using TestSenderMapFunction = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class TestFailMapperEvaluatorsOnInit : TestFailMapperEvaluators
+    {
+        /// <summary>
+        /// This test is to throw exceptions in two tasks. In the first try, 
there is task app failure,
+        /// and no retries will be done. 
+        /// </summary>
+        [Fact(Skip = "Times out at high timeout for 
RetryCountWaitingForRegistration; disabling until this parameter is 
configurable in test.")]
+        public override void TestFailedMapperOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 9;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                NumberOfRetry,
+                testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 
360);
+            var completedTaskCount = GetMessageCount(lines, "Received 
ICompletedTask");
+            var failedEvaluatorCount = GetMessageCount(lines, 
FailedEvaluatorMessage);
+            var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+
+            // on each try each task should fail or complete or disappear with 
failed evaluator
+            // not all tasks will start successfully, so not checking this
+            Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + 
failedEvaluatorCount + failedTaskCount);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// Mapper function configuration. Subclass can override it to have 
its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildMapperFunctionConfig()
+        {
+            var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<TestSenderMapFunction>.Class)                  
 
+                .Build();
+
+            return TangFactory.GetTang().NewConfigurationBuilder(c)
+                .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+                .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+                
.BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskInitialization.ToString())
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery), 
NumberOfRetry.ToString())
+                .Build();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
new file mode 100644
index 0000000..dc998fc
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using TaskIdsToFail = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TaskIdsToFail;
+using FailureType = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using TestSenderMapFunction = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.TestSenderMapFunction;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class TestFailMapperTasks : TestFailMapperEvaluators
+    {
+        /// <summary>
+        /// This test is to throw exceptions in two tasks. In the first try, 
there is task app failure,
+        /// and no retries will be done. 
+        /// </summary>
+        [Fact]
+        public override void TestFailedMapperOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 9;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                NumberOfRetry,
+                testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 
240);
+            var completedTaskCount = GetMessageCount(lines, "Received 
ICompletedTask");
+            var runningTaskCount = GetMessageCount(lines, RunningTaskMessage);
+            var failedEvaluatorCount = GetMessageCount(lines, 
FailedEvaluatorMessage);
+            var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+
+            // on each try each task should fail or complete
+            // there shoould be no failed evaluators
+            // and on each try all tasks should start successfully
+            Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + 
failedTaskCount);
+            Assert.Equal(0, failedEvaluatorCount);
+            Assert.Equal((NumberOfRetry + 1) * numTasks, runningTaskCount);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// Mapper function configuration. Subclass can override it to have 
its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildMapperFunctionConfig()
+        {
+            var c = IMRUMapConfiguration<int[], int[]>.ConfigurationModule
+                .Set(IMRUMapConfiguration<int[], int[]>.MapFunction,
+                    GenericType<TestSenderMapFunction>.Class)                  
 
+                .Build();
+
+            return TangFactory.GetTang().NewConfigurationBuilder(c)
+                .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-2-")
+                .BindSetEntry<TaskIdsToFail, 
string>(GenericType<TaskIdsToFail>.Class, "IMRUMap-RandomInputPartition-3-")
+                
.BindIntNamedParam<FailureType>(FailureType.TaskFailureDuringTaskExecution.ToString())
+                .BindNamedParameter(typeof(MaxRetryNumberInRecovery), 
NumberOfRetry.ToString())
+                .Build();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
new file mode 100644
index 0000000..cf16e25
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluator.cs
@@ -0,0 +1,236 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce;
+using FailureType = 
Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce.FaultTolerantPipelinedBroadcastAndReduce.FailureType;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+using TraceLevel = System.Diagnostics.TraceLevel;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class TestFailUpdateEvaluator : IMRUBrodcastReduceTestBase
+    {
+        private const int NumberOfRetry = 3;
+
+        /// <summary>
+        /// This test is to fail update evaluator and then try to resubmit. We 
don't recover from update evaluator failure. 
+        /// </summary>
+        [Fact]
+        public virtual void TestFailedUpdateOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 9;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false,
+                numTasks,
+                chunkSize,
+                dims,
+                iterations,
+                mapperMemory,
+                updateTaskMemory,
+                NumberOfRetry,
+                testFolder);
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 
240);
+            var completedTaskCount = GetMessageCount(lines, "Received 
ICompletedTask");
+            var runningTaskCount = GetMessageCount(lines, RunningTaskMessage);
+            var failedEvaluatorCount = GetMessageCount(lines, 
FailedEvaluatorMessage);
+            var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
+
+            // there should be one try with each task either completing or 
disappearing with failed evaluator
+            // no task failures
+            // and on this try all tasks should start successfully
+            Assert.Equal(numTasks, completedTaskCount + failedEvaluatorCount);
+            Assert.Equal(0, failedTaskCount);
+            Assert.Equal(numTasks, runningTaskCount);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This test is for the normal scenarios of IMRUDriver and IMRUTasks 
on yarn
+        /// </summary>
+        [Fact(Skip = "Requires Yarn")]
+        public virtual void TestFailedUpdateOnYarn()
+        {
+            int chunkSize = 2;
+            int dims = 100;
+            int iterations = 200;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            TestBroadCastAndReduce(true, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory);
+        }
+
+        /// <summary>
+        /// This method defines event handlers for driver. As default, it uses 
all the handlers defined in IMRUDriver.
+        /// </summary>
+        /// <typeparam name="TMapInput"></typeparam>
+        /// <typeparam name="TMapOutput"></typeparam>
+        /// <typeparam name="TResult"></typeparam>
+        /// <typeparam name="TPartitionType"></typeparam>
+        /// <returns></returns>
+        protected override IConfiguration 
DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, 
TPartitionType>()
+        {
+            return REEF.Driver.DriverConfiguration.ConfigurationModule
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnDriverStarted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextActive,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, 
GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, 
GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning, 
GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed, 
GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, 
TraceLevel.Verbose.ToString())
+                .Build();
+        }
+
+        /// <summary>
+        /// Mapper function configuration. Subclass can override it to have 
its own test function.
+        /// </summary>
+        /// <returns></returns>
+        protected override IConfiguration BuildUpdateFunctionConfig()
+        {
+            var c = IMRUUpdateConfiguration<int[], int[], 
int[]>.ConfigurationModule
+                .Set(IMRUUpdateConfiguration<int[], int[], 
int[]>.UpdateFunction,
+                    GenericType<TestUpdateFunction>.Class)                   
+                .Build();
+
+            return TangFactory.GetTang().NewConfigurationBuilder(c)
+                
.BindIntNamedParam<FailureType>(FailureType.EvaluatorFailureDuringTaskExecution.ToString())
+                .Build();
+        }
+
+        internal sealed class TestUpdateFunction : IUpdateFunction<int[], 
int[], int[]>
+        {
+            private int _iterations;
+            private readonly int _maxIters;
+            private readonly int _dim;
+            private readonly int[] _intArr;
+            private readonly int _workers;
+            private readonly string _taskId;
+            private int _failureType;
+
+            [Inject]
+            private TestUpdateFunction(
+                
[Parameter(typeof(BroadcastReduceConfiguration.NumberOfIterations))] int 
maxIters,
+                [Parameter(typeof(BroadcastReduceConfiguration.Dimensions))] 
int dim,
+                [Parameter(typeof(BroadcastReduceConfiguration.NumWorkers))] 
int numWorkers,
+                [Parameter(typeof(TaskConfigurationOptions.Identifier))] 
string taskId,
+                [Parameter(typeof(FailureType))] int failureType)
+            {
+                _maxIters = maxIters;
+                _iterations = 0;
+                _dim = dim;
+                _intArr = new int[_dim];
+                _workers = numWorkers;
+                _taskId = taskId;
+                _failureType = failureType;
+                Logger.Log(Level.Info, "TestUpdateFunction: TaskId: {0}", 
_taskId);
+                Logger.Log(Level.Info, "Failure type: {0} failure", 
FailureType.IsEvaluatorFailure(_failureType) ? "evaluator" : "task");
+            }
+
+            /// <summary>
+            /// Update function
+            /// </summary>
+            /// <param name="input">integer array</param>
+            /// <returns>The same integer array</returns>
+            UpdateResult<int[], int[]> IUpdateFunction<int[], int[], 
int[]>.Update(int[] input)
+            {
+                if (input[0] != (_iterations + 1) * _workers)
+                {
+                    Exceptions.Throw(new Exception("Expected input to update 
functon not same as actual input"), Logger);
+                }
+
+                _iterations++;
+                Logger.Log(Level.Info, "Received value {0} in iteration {1}", 
input[0], _iterations);
+                MakeException();
+
+                if (_iterations < _maxIters)
+                {
+                    for (int i = 0; i < _dim; i++)
+                    {
+                        _intArr[i] = _iterations + 1;
+                    }
+
+                    return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+                }
+
+                return UpdateResult<int[], int[]>.Done(input);
+            }
+
+            /// <summary>
+            /// Initialize function. Sends integer array with value 1 to all 
mappers
+            /// </summary>
+            /// <returns>Map input</returns>
+            UpdateResult<int[], int[]> IUpdateFunction<int[], int[], 
int[]>.Initialize()
+            {
+                for (int i = 0; i < _dim; i++)
+                {
+                    _intArr[i] = _iterations + 1;
+                }
+
+                return UpdateResult<int[], int[]>.AnotherRound(_intArr);
+            }
+
+            private void MakeException()
+            {
+                if (_iterations == 10 && !_taskId.EndsWith("-" + 
NumberOfRetry))
+                { 
+                    Logger.Log(Level.Warning, "Simulating {0} failure for 
taskId {1}",
+                        FailureType.IsEvaluatorFailure(_failureType) ? 
"evaluator" : "task",
+                        _taskId);
+                    if (FailureType.IsEvaluatorFailure(_failureType))
+                    {
+                        // simulate evaluator failure
+                        Environment.Exit(1);
+                    }
+                    else
+                    {
+                        // simulate task failure
+                        throw new ArgumentNullException("Simulating task 
failure");
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 8856fca..d043ab1 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -124,6 +124,10 @@ under the License.
     <Compile Include="Functional\IMRU\IMRUBrodcastReduceTestBase.cs" />
     <Compile Include="Functional\IMRU\IMRUCloseTaskTest.cs" />
     <Compile Include="Functional\IMRU\IMRUMapperCountTest.cs" />
+    <Compile Include="Functional\IMRU\TestFailMapperEvaluatorsOnInit.cs" />
+    <Compile Include="Functional\IMRU\TestFailUpdateEvaluator.cs" />
+    <Compile Include="Functional\IMRU\TestFailMapperTasks.cs" />
+    <Compile Include="Functional\IMRU\TestFailMapperEvaluators.cs" />
     <Compile Include="Functional\IMRU\TestTaskExceptions.cs" />
     <Compile 
Include="Functional\Messaging\TestContextMessageSourceAndHandler.cs" />
     <Compile Include="Functional\Messaging\TestMessageEventManager.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
index 4a8a048..572b245 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingLink.cs
@@ -162,8 +162,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             }
             catch (Exception e)
             {
-                Logger.Log(Level.Warning, "In Read function unable to read the 
message.");
-                Exceptions.CaughtAndThrow(e, Level.Error, Logger);
+                Logger.Log(Level.Warning, "In StreamingLink::Read function 
unable to read the message {0}.", e.GetType());
                 throw;
             }
         }
@@ -186,8 +185,7 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             }
             catch (Exception e)
             {
-                Logger.Log(Level.Warning, "In ReadAsync function unable to 
read the message.");
-                Exceptions.CaughtAndThrow(e, Level.Error, Logger);
+                Logger.Log(Level.Warning, "In StreamingLink::ReadAsync 
function unable to read the message, {0}.", e.GetType());
                 throw;
             }
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
index 0ec4c8a..cca8abd 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportClient.cs
@@ -19,6 +19,7 @@ using System;
 using System.Net;
 using System.Threading;
 using System.Threading.Tasks;
+using Org.Apache.REEF.Utilities.AsyncUtils;
 using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
 using Org.Apache.REEF.Wake.StreamingCodec;
@@ -68,7 +69,15 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             : this(remoteEndpoint, streamingCodec, clientFactory)
         {
             _observer = observer;
-            Task.Factory.StartNew(() => ResponseLoop(), 
TaskCreationOptions.LongRunning);
+            try
+            {
+                Task.Factory.StartNew(() => ResponseLoop(), 
TaskCreationOptions.LongRunning);
+            }
+            catch (Exception e)
+            {
+                Logger.Log(Level.Warning, "StreamingTransportClient get 
exception from ResponseLoop: {0}.", e.GetType());
+                throw e;
+            }            
         }
 
         /// <summary>
@@ -111,16 +120,24 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// </summary>
         private async Task ResponseLoop()
         {
-            while (!_cancellationSource.IsCancellationRequested)
+            try
             {
-                T message = await _link.ReadAsync(_cancellationSource.Token);
-                if (message == null)
+                while (!_cancellationSource.IsCancellationRequested)
                 {
-                    break;
-                }
+                    T message = await 
_link.ReadAsync(_cancellationSource.Token);
+                    if (message == null)
+                    {
+                        break;
+                    }
 
-                TransportEvent<T> transportEvent = new 
TransportEvent<T>(message, _link);
-                _observer.OnNext(transportEvent);
+                    TransportEvent<T> transportEvent = new 
TransportEvent<T>(message, _link);
+                    _observer.OnNext(transportEvent);
+                }
+            }
+            catch (Exception e)
+            {
+                Logger.Log(Level.Warning, "StreamingTransportClient get 
exception in ResponseLoop: {0}.", e.GetType());
+                throw e;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
index 706fac4..6a9ea1a 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Impl/StreamingTransportServer.cs
@@ -162,7 +162,9 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
                 {
                     TcpClient client = await 
_listener.AcceptTcpClientAsync().ConfigureAwait(false);
                     ProcessClient(client).LogAndIgnoreExceptionIfAny(
-                        LOGGER, "Task Exception observed processing client in 
StreamingTransportServer.", Level.Warning);
+                        LOGGER,
+                        "StreamingTransportServer observed Task Exception 
during client processing.",
+                        Level.Warning);
                 }
             }
             catch (InvalidOperationException)
@@ -173,6 +175,11 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
             {
                 LOGGER.Log(Level.Info, "StreamingTransportServer has been 
closed.");
             }
+            catch (Exception e)
+            {
+                LOGGER.Log(Level.Warning, "StreamingTransportServer got 
exception: {0}.", e.GetType());
+                throw e;
+            }
         }
 
         /// <summary>
@@ -181,24 +188,32 @@ namespace Org.Apache.REEF.Wake.Remote.Impl
         /// <param name="client">The connected client</param>
         private async Task ProcessClient(TcpClient client)
         {
-            // Keep reading messages from client until they disconnect or 
timeout
             CancellationToken token = _cancellationSource.Token;
-            using (ILink<T> link = new StreamingLink<T>(client, 
_streamingCodec))
+            try
             {
-                while (!token.IsCancellationRequested)
+                // Keep reading messages from client until they disconnect or 
timeout
+                using (ILink<T> link = new StreamingLink<T>(client, 
_streamingCodec))
                 {
-                    T message = await link.ReadAsync(token);
-
-                    if (message == null)
+                    while (!token.IsCancellationRequested)
                     {
-                        break;
-                    }
+                        T message = await link.ReadAsync(token);
 
-                    TransportEvent<T> transportEvent = new 
TransportEvent<T>(message, link);
-                    _remoteObserver.OnNext(transportEvent);
+                        if (message == null)
+                        {
+                            break;
+                        }
+
+                        TransportEvent<T> transportEvent = new 
TransportEvent<T>(message, link);
+                        _remoteObserver.OnNext(transportEvent);
+                    }
+                    LOGGER.Log(Level.Error,
+                        "ProcessClient close the Link. 
IsCancellationRequested: " + token.IsCancellationRequested);
                 }
-                LOGGER.Log(Level.Error,
-                    "ProcessClient close the Link. IsCancellationRequested: " 
+ token.IsCancellationRequested);
+            }
+            catch (Exception e)
+            {
+                LOGGER.Log(Level.Warning, "StreamingTransportServer get 
exception in ProcessClient: {0}, IsCancellationRequested {1}.", e.GetType(), 
token.IsCancellationRequested);
+                throw e;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs 
b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs
index 934983f..75087d5 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Remote/Parameters/ConnectionRetryCount.cs
@@ -19,7 +19,7 @@ using Org.Apache.REEF.Tang.Annotations;
 
 namespace Org.Apache.REEF.Wake.Remote.Parameters
 {
-    [NamedParameter("Number of retries for connecting to endpoint", 
defaultValue: "20")]
+    [NamedParameter("Number of retries for connecting to endpoint", 
defaultValue: "200")]
     public sealed class ConnectionRetryCount : Name<int>
     {
     }

Reply via email to