Repository: reef
Updated Branches:
  refs/heads/master 9b034dd3c -> 1c7adac04


[REEF-1550] Clean up task exceptions in IMRU task hosts

* Add base class TaskHostBase to hold common IMRU configurations
  and exception handling logic
* Refactor MapTaskHost and UpdateTaskHost
* Update test cases

JIRA:
  [REEF-1550](https://issues.apache.org/jira/browse/REEF-1550)

Pull request:
  This closes #1113


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/1c7adac0
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/1c7adac0
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/1c7adac0

Branch: refs/heads/master
Commit: 1c7adac041243973f53ec98b0cf8cd468c58dd93
Parents: 9b034dd
Author: Julia Wang <[email protected]>
Authored: Fri Sep 9 14:13:41 2016 -0700
Committer: Mariia Mykhailova <[email protected]>
Committed: Fri Sep 9 16:50:20 2016 -0700

----------------------------------------------------------------------
 .../OnREEF/IMRUTasks/MapTaskHost.cs             | 175 +++------------
 .../OnREEF/IMRUTasks/TaskHostBase.cs            | 223 +++++++++++++++++++
 .../OnREEF/IMRUTasks/UpdateTaskHost.cs          | 211 +++++-------------
 .../Org.Apache.REEF.IMRU.csproj                 |   1 +
 .../Functional/IMRU/TestFailMapperTasks.cs      |   9 +-
 5 files changed, 309 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/1c7adac0/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
index ca2fb85..f843036 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs
@@ -16,10 +16,6 @@
 // under the License.
 
 using System;
-using System.IO;
-using System.Net.Sockets;
-using System.Runtime.Remoting;
-using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Common.Tasks.Events;
 using Org.Apache.REEF.IMRU.API;
@@ -31,7 +27,6 @@ using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote.Impl;
 
 namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
 {
@@ -41,197 +36,83 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
     /// <typeparam name="TMapInput">Map input</typeparam>
     /// <typeparam name="TMapOutput">Map output</typeparam>
     [ThreadSafe]
-    internal sealed class MapTaskHost<TMapInput, TMapOutput> : ITask, 
IObserver<ICloseEvent>
+    internal sealed class MapTaskHost<TMapInput, TMapOutput> : TaskHostBase, 
ITask, IObserver<ICloseEvent>
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(MapTaskHost<TMapInput, TMapOutput>));
 
         private readonly 
IBroadcastReceiver<MapInputWithControlMessage<TMapInput>> 
_dataAndMessageReceiver;
         private readonly IReduceSender<TMapOutput> _dataReducer;
         private readonly IMapFunction<TMapInput, TMapOutput> _mapTask;
-        private readonly bool _invokeGC;
-
-        /// <summary>
-        /// Shows if the object has been disposed.
-        /// </summary>
-        private int _disposed = 0;
-
-        /// <summary>
-        /// Group Communication client for the task
-        /// </summary>
-        private readonly IGroupCommClient _groupCommunicationsClient;
-
-        /// <summary>
-        /// Task close Coordinator to handle the work when receiving task 
close event
-        /// </summary>
-        private readonly TaskCloseCoordinator _taskCloseCoordinator;
-
-        /// <summary>
-        /// The cancellation token to control the group communication 
operation cancellation
-        /// </summary>
-        private readonly CancellationTokenSource _cancellationSource;
 
         /// <summary>
         /// </summary>
         /// <param name="mapTask">The MapTask hosted in this REEF Task.</param>
         /// <param name="groupCommunicationsClient">Used to setup the 
communications.</param>
         /// <param name="taskCloseCoordinator">Task close Coordinator</param>
-        /// <param name="invokeGC">Whether to call Garbage Collector after 
each iteration or not</param>
+        /// <param name="invokeGc">Whether to call Garbage Collector after 
each iteration or not</param>
         /// <param name="taskId">task id</param>
         [Inject]
         private MapTaskHost(
             IMapFunction<TMapInput, TMapOutput> mapTask,
             IGroupCommClient groupCommunicationsClient,
             TaskCloseCoordinator taskCloseCoordinator,
-            [Parameter(typeof(InvokeGC))] bool invokeGC,
-            [Parameter(typeof(TaskConfigurationOptions.Identifier))] string 
taskId)
+            [Parameter(typeof(InvokeGC))] bool invokeGc,
+            [Parameter(typeof(TaskConfigurationOptions.Identifier))] string 
taskId) :
+            base(groupCommunicationsClient, taskCloseCoordinator, invokeGc)
         {
             Logger.Log(Level.Info, "Entering constructor of MapTaskHost for 
task id {0}", taskId);
             _mapTask = mapTask;
-            _groupCommunicationsClient = groupCommunicationsClient;
-            var cg = 
groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);
             _dataAndMessageReceiver =
-                
cg.GetBroadcastReceiver<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName);
-            _dataReducer = 
cg.GetReduceSender<TMapOutput>(IMRUConstants.ReduceOperatorName);
-            _invokeGC = invokeGC;
-            _taskCloseCoordinator = taskCloseCoordinator;
-            _cancellationSource = new CancellationTokenSource();
+                
_communicationGroupClient.GetBroadcastReceiver<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName);
+            _dataReducer = 
_communicationGroupClient.GetReduceSender<TMapOutput>(IMRUConstants.ReduceOperatorName);
             Logger.Log(Level.Info, "MapTaskHost initialized.");
         }
 
         /// <summary>
         /// Performs IMRU iterations on map side
         /// </summary>
-        /// <param name="memento"></param>
         /// <returns></returns>
-        public byte[] Call(byte[] memento)
+        protected override byte[] TaskBody(byte[] memento)
         {
-            Logger.Log(Level.Info, "Entering MapTaskHost Call().");
             MapControlMessage controlMessage = MapControlMessage.AnotherRound;
-            try
+            while (!_cancellationSource.IsCancellationRequested && 
controlMessage != MapControlMessage.Stop)
             {
-                while (!_cancellationSource.IsCancellationRequested && 
controlMessage != MapControlMessage.Stop)
+                if (_invokeGc)
                 {
-                    if (_invokeGC)
-                    {
-                        Logger.Log(Level.Verbose, "Calling Garbage Collector");
-                        GC.Collect();
-                        GC.WaitForPendingFinalizers();
-                    }
-
-                    using (
-                        MapInputWithControlMessage<TMapInput> mapInput =
-                            
_dataAndMessageReceiver.Receive(_cancellationSource))
-                    {
-                        controlMessage = mapInput.ControlMessage;
-                        if (controlMessage != MapControlMessage.Stop)
-                        {
-                            _dataReducer.Send(_mapTask.Map(mapInput.Message), 
_cancellationSource);
-                        }
-                    }
+                    Logger.Log(Level.Verbose, "Calling Garbage Collector");
+                    GC.Collect();
+                    GC.WaitForPendingFinalizers();
                 }
-            }
-            catch (OperationCanceledException e)
-            {
-                Logger.Log(Level.Warning,
-                    "Received OperationCanceledException in MapTaskHost with 
message: {0}. The cancellation token is: {1}.",
-                    e.Message,
-                    _cancellationSource.IsCancellationRequested);
-            }
-            catch (Exception e)
-            {
-                if (e is IOException || e is TcpClientConnectionException || e 
is RemotingException ||
-                    e is SocketException)
-                {
-                    Logger.Log(Level.Error,
-                        "Received Exception {0} in MapTaskHost with message: 
{1}. The cancellation token is: {2}.",
-                        e.GetType(),
-                        e.Message,
-                        _cancellationSource.IsCancellationRequested);
-                    if (!_cancellationSource.IsCancellationRequested)
-                    {
-                        Logger.Log(Level.Error,
-                            "MapTask is throwing 
IMRUTaskGroupCommunicationException with cancellation token: {0}.",
-                            _cancellationSource.IsCancellationRequested);
-                        throw new 
IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
-                    }
-                }
-                else if (e is AggregateException)
+
+                using (
+                    MapInputWithControlMessage<TMapInput> mapInput =
+                        _dataAndMessageReceiver.Receive(_cancellationSource))
                 {
-                    Logger.Log(Level.Error,
-                        "Received AggregateException. The cancellation token 
is: {0}.",
-                        _cancellationSource.IsCancellationRequested);
-                    if (e.InnerException != null)
-                    {
-                        Logger.Log(Level.Error,
-                            "InnerException {0}, with message {1}.",
-                            e.InnerException.GetType(),
-                            e.InnerException.Message);
-                    }
-                    if (!_cancellationSource.IsCancellationRequested)
+                    controlMessage = mapInput.ControlMessage;
+                    if (controlMessage != MapControlMessage.Stop)
                     {
-                        if (e.InnerException != null && e.InnerException is 
IOException)
+                        TMapOutput output = default(TMapOutput);
+                        try
                         {
-                            Logger.Log(Level.Error,
-                                "MapTask is throwing 
IMRUTaskGroupCommunicationException with cancellation token: {0}.",
-                                _cancellationSource.IsCancellationRequested);
-                            throw new 
IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
+                            output = _mapTask.Map(mapInput.Message);
                         }
-                        else
+                        catch (Exception e)
                         {
-                            throw e;
+                            HandleTaskAppException(e);
                         }
-                    }                   
-                }
-                else
-                {
-                    Logger.Log(Level.Error,
-                       "MapTask is throwing Exception {0}, message {1} with 
cancellation token: {2} and StackTrace {3}.",
-                       e.GetType(),
-                       e.Message,
-                       _cancellationSource.IsCancellationRequested,
-                       e.StackTrace);
-                    if (!_cancellationSource.IsCancellationRequested)
-                    {
-                        throw e;
+                        _dataReducer.Send(output, _cancellationSource);
                     }
                 }
             }
-            finally
-            {
-                _taskCloseCoordinator.SignalTaskStopped();
-            } 
-            Logger.Log(Level.Info, "MapTaskHost returned with cancellation 
token:{0}.", _cancellationSource.IsCancellationRequested);
             return null;
         }
 
         /// <summary>
-        /// Task close handler. Calls TaskCloseCoordinator to handle the event.
+        /// Return mapTaskHost name
         /// </summary>
-        /// <param name="closeEvent"></param>
-        public void OnNext(ICloseEvent closeEvent)
-        {
-            _taskCloseCoordinator.HandleEvent(closeEvent, _cancellationSource);
-        }
-
-        /// <summary>
-        /// Dispose function. Dispose IGroupCommunicationsClient.
-        /// </summary>
-        public void Dispose()
-        {
-            if (Interlocked.Exchange(ref _disposed, 1) == 0)
-            {
-                _groupCommunicationsClient.Dispose();
-            }
-        }
-
-        public void OnError(Exception error)
-        {
-            throw new NotImplementedException();
-        }
-
-        public void OnCompleted()
+        protected override string TaskHostName
         {
-            throw new NotImplementedException();
+            get { return "MapTaskHost"; }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/1c7adac0/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs
new file mode 100644
index 0000000..d025528
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs
@@ -0,0 +1,223 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.IO;
+using System.Net.Sockets;
+using System.Runtime.Remoting;
+using System.Threading;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
+{
+    internal abstract class TaskHostBase
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(TaskHostBase));
+
+        /// <summary>
+        /// Shows if the object has been disposed.
+        /// </summary>
+        private int _disposed;
+
+        /// <summary>
+        /// Group Communication client for the task
+        /// </summary>
+        private readonly IGroupCommClient _groupCommunicationsClient;
+
+        /// <summary>
+        /// Task close Coordinator to handle the work when receiving task 
close event
+        /// </summary>
+        private readonly TaskCloseCoordinator _taskCloseCoordinator;
+
+        /// <summary>
+        /// Specify whether to invoke garbage collector or not
+        /// </summary>
+        protected readonly bool _invokeGc;
+
+        /// <summary>
+        /// CommunicationGroupClient for the task
+        /// </summary>
+        protected readonly ICommunicationGroupClient _communicationGroupClient;
+
+        /// <summary>
+        /// The cancellation token to control the group communication 
operation cancellation
+        /// </summary>
+        protected readonly CancellationTokenSource _cancellationSource;
+
+        /// <summary>
+        /// Task host base class to hold the common stuff of both mapper and 
update tasks
+        /// </summary>
+        /// <param name="groupCommunicationsClient">Group Communication 
Client</param>
+        /// <param name="taskCloseCoordinator">The class that handles the 
close event for the task</param>
+        /// <param name="invokeGc">specify if want to invoke garbage collector 
or not </param>
+        protected TaskHostBase(
+            IGroupCommClient groupCommunicationsClient,
+            TaskCloseCoordinator taskCloseCoordinator,
+            bool invokeGc)
+        {
+            _groupCommunicationsClient = groupCommunicationsClient;
+            _communicationGroupClient = 
groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);
+
+            _invokeGc = invokeGc;
+            _taskCloseCoordinator = taskCloseCoordinator;
+            _cancellationSource = new CancellationTokenSource();
+        }
+
+        /// <summary>
+        /// Handle the exceptions in the Call() method
+        /// Default to IMRUSystemException to make it recoverable
+        /// </summary>
+        public byte[] Call(byte[] memento)
+        {
+            Logger.Log(Level.Info, "Entering {0} Call().", TaskHostName);
+            try
+            {
+                return TaskBody(memento);
+            }
+            catch (Exception e)
+            {
+                if (e is IMRUTaskAppException)
+                {
+                    throw;
+                }
+                if (IsCommunicationException(e))
+                {
+                    HandleCommunicationException(e);
+                }
+                else
+                {
+                    HandleSystemException(e);
+                }
+            }
+            finally
+            {
+                FinallyBlock();
+                _taskCloseCoordinator.SignalTaskStopped();
+            }
+            Logger.Log(Level.Info, "{0} returned with cancellation 
token:{1}.", TaskHostName, _cancellationSource.IsCancellationRequested);
+            return null;
+        }
+
+        private static bool IsCommunicationException(Exception e)
+        {
+            if (e is OperationCanceledException || e is IOException || e is 
TcpClientConnectionException ||
+                e is RemotingException || e is SocketException ||
+                (e is AggregateException && e.InnerException != null && 
e.InnerException is IOException))
+            {
+                return true;
+            }
+            return false;
+        }
+
+        /// <summary>
+        /// The body of Call method. Subclass must override it. 
+        /// </summary>
+        protected abstract byte[] TaskBody(byte[] memento);
+
+        /// <summary>
+        /// The code that needs to be executed no matter exception happens or 
not in Call() method.  
+        /// </summary>
+        protected virtual void FinallyBlock()
+        {
+        }
+
+        /// <summary>
+        /// Task host name
+        /// </summary>
+        protected abstract string TaskHostName { get; }
+
+        /// <summary>
+        /// Task close handler. Call TaskCloseCoordinator to handle the event.
+        /// </summary>
+        public void OnNext(ICloseEvent closeEvent)
+        {
+            _taskCloseCoordinator.HandleEvent(closeEvent, _cancellationSource);
+        }
+
+        /// <summary>
+        /// Dispose function. Dispose IGroupCommunicationsClient.
+        /// </summary>
+        public void Dispose()
+        {
+            if (Interlocked.Exchange(ref _disposed, 1) == 0)
+            {
+                _groupCommunicationsClient.Dispose();
+            }
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+
+        /// <summary>
+        /// Convert the exception into IMRUTaskGroupCommunicationException
+        /// </summary>
+        protected void HandleCommunicationException(Exception e)
+        {
+            HandleException(e, new 
IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError, 
e));
+        }
+
+        /// <summary>
+        /// Convert the exception into IMRUSystemException
+        /// </summary>
+        protected void HandleSystemException(Exception e)
+        {
+            HandleException(e, new 
IMRUTaskSystemException(TaskManager.TaskSystemError, e));
+        }
+
+        /// <summary>
+        /// Convert the exception into IMRUTaskAppException
+        /// </summary>
+        protected void HandleTaskAppException(Exception e)
+        {
+            HandleException(e, new 
IMRUTaskAppException(TaskManager.TaskAppError, e));
+        }
+
+        /// <summary>
+        /// Log and throw target exception if cancellation token is not set
+        /// In the cancellation case, simply log and return.
+        /// </summary>
+        private void HandleException(Exception originalException, Exception 
targetException)
+        {
+            Logger.Log(Level.Error,
+                "Received Exception {0} in {1} with message: {2}. The 
cancellation token is: {3}.",
+                originalException.GetType(),
+                TaskHostName,
+                originalException.Message,
+                _cancellationSource.IsCancellationRequested);
+            if (!_cancellationSource.IsCancellationRequested)
+            {
+                Logger.Log(Level.Error,
+                    "{0} is throwing {1} with cancellation token: {2}.",
+                    TaskHostName,
+                    targetException.GetType(),
+                    _cancellationSource.IsCancellationRequested);
+                throw targetException;
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/1c7adac0/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
index 01a2bdb..382721c 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
@@ -16,10 +16,6 @@
 // under the License.
 
 using System;
-using System.IO;
-using System.Net.Sockets;
-using System.Runtime.Remoting;
-using System.Threading;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Common.Tasks.Events;
 using Org.Apache.REEF.IMRU.API;
@@ -31,7 +27,6 @@ using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote.Impl;
 
 namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
 {
@@ -42,43 +37,22 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
     /// <typeparam name="TMapOutput">Map output</typeparam>
     /// <typeparam name="TResult">Final result</typeparam>
     [ThreadSafe]
-    internal sealed class UpdateTaskHost<TMapInput, TMapOutput, TResult> : 
ITask, IObserver<ICloseEvent>
+    internal sealed class UpdateTaskHost<TMapInput, TMapOutput, TResult> : 
TaskHostBase, ITask, IObserver<ICloseEvent>
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(UpdateTaskHost<TMapInput, TMapOutput, TResult>));
 
         private readonly IReduceReceiver<TMapOutput> _dataReceiver;
         private readonly 
IBroadcastSender<MapInputWithControlMessage<TMapInput>> 
_dataAndControlMessageSender;
         private readonly IUpdateFunction<TMapInput, TMapOutput, TResult> 
_updateTask;
-        private readonly bool _invokeGC;
         private readonly IIMRUResultHandler<TResult> _resultHandler;
 
         /// <summary>
-        /// Shows if the object has been disposed.
-        /// </summary>
-        private int _disposed = 0;
-
-        /// <summary>
-        /// Group Communication client for the task
-        /// </summary>
-        private readonly IGroupCommClient _groupCommunicationsClient;
-
-        /// <summary>
-        /// Task close Coordinator to handle the work when receiving task 
close event
-        /// </summary>
-        private readonly TaskCloseCoordinator _taskCloseCoordinator;
-
-        /// <summary>
-        /// The cancellation token to control the group communication 
operation cancellation
-        /// </summary>
-        private readonly CancellationTokenSource _cancellationSource;
-
-        /// <summary>
         /// </summary>
         /// <param name="updateTask">The UpdateTask hosted in this REEF 
Task.</param>
         /// <param name="groupCommunicationsClient">Used to setup the 
communications.</param>
         /// <param name="resultHandler">Result handler</param>
         /// <param name="taskCloseCoordinator">Task close Coordinator</param>
-        /// <param name="invokeGC">Whether to call Garbage Collector after 
each iteration or not</param>
+        /// <param name="invokeGc">Whether to call Garbage Collector after 
each iteration or not</param>
         /// <param name="taskId">task id</param>
         [Inject]
         private UpdateTaskHost(
@@ -86,180 +60,99 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
             IGroupCommClient groupCommunicationsClient,
             IIMRUResultHandler<TResult> resultHandler,
             TaskCloseCoordinator taskCloseCoordinator,
-            [Parameter(typeof(InvokeGC))] bool invokeGC,
-            [Parameter(typeof(TaskConfigurationOptions.Identifier))] string 
taskId)
+            [Parameter(typeof(InvokeGC))] bool invokeGc,
+            [Parameter(typeof(TaskConfigurationOptions.Identifier))] string 
taskId) :
+            base(groupCommunicationsClient, taskCloseCoordinator, invokeGc)
         {
             Logger.Log(Level.Info, "Entering constructor of UpdateTaskHost for 
task id {0}", taskId);
             _updateTask = updateTask;
-            _groupCommunicationsClient = groupCommunicationsClient;
-            var cg = 
groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName);
             _dataAndControlMessageSender =
-                
cg.GetBroadcastSender<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName);
-            _dataReceiver = 
cg.GetReduceReceiver<TMapOutput>(IMRUConstants.ReduceOperatorName);
-            _invokeGC = invokeGC;
+                
_communicationGroupClient.GetBroadcastSender<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName);
+            _dataReceiver = 
_communicationGroupClient.GetReduceReceiver<TMapOutput>(IMRUConstants.ReduceOperatorName);
             _resultHandler = resultHandler;
-            _taskCloseCoordinator = taskCloseCoordinator;
-            _cancellationSource = new CancellationTokenSource();
             Logger.Log(Level.Info, "UpdateTaskHost initialized.");
         }
 
         /// <summary>
         /// Performs IMRU iterations on update side
         /// </summary>
-        /// <param name="memento"></param>
         /// <returns></returns>
-        public byte[] Call(byte[] memento)
+        protected override byte[] TaskBody(byte[] memento)
         {
-            Logger.Log(Level.Info, "Entering UpdateTaskHost Call().");
-            var updateResult = _updateTask.Initialize();
-            int iterNo = 0;
+            UpdateResult<TMapInput, TResult> updateResult = null;
             try
             {
-                while (updateResult.HasMapInput && 
!_cancellationSource.IsCancellationRequested)
-                {
-                    iterNo++;
-
-                    using (
-                        var message = new 
MapInputWithControlMessage<TMapInput>(updateResult.MapInput,
-                            MapControlMessage.AnotherRound))
-                    {
-                        _dataAndControlMessageSender.Send(message);
-                    }
-
-                    var input = _dataReceiver.Reduce(_cancellationSource);
-
-                    if (_invokeGC)
-                    {
-                        Logger.Log(Level.Verbose, "Calling Garbage Collector");
-                        GC.Collect();
-                        GC.WaitForPendingFinalizers();
-                    }
-
-                    updateResult = _updateTask.Update(input);
-
-                    if (updateResult.HasResult)
-                    {
-                        _resultHandler.HandleResult(updateResult.Result);
-                    }
-                }
-                if (!_cancellationSource.IsCancellationRequested)
-                {
-                    MapInputWithControlMessage<TMapInput> stopMessage =
-                        new 
MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop);
-                    _dataAndControlMessageSender.Send(stopMessage);
-                }
+                updateResult = _updateTask.Initialize();
             }
-            catch (OperationCanceledException e)
+            catch (Exception e)
             {
-                Logger.Log(Level.Warning,
-                    "Received OperationCanceledException in UpdateTaskHost 
with message: {0}.",
-                    e.Message);
+                HandleTaskAppException(e);
             }
-            catch (Exception e)
+
+            while (!_cancellationSource.IsCancellationRequested && 
updateResult.HasMapInput)
             {
-                if (e is IOException || e is TcpClientConnectionException || e 
is RemotingException ||
-                    e is SocketException)
+                using (
+                    var message = new 
MapInputWithControlMessage<TMapInput>(updateResult.MapInput,
+                        MapControlMessage.AnotherRound))
                 {
-                    Logger.Log(Level.Error,
-                        "Received Exception {0} in UpdateTaskHost with 
message: {1}. The cancellation token is: {2}.",
-                        e.GetType(),
-                        e.Message,
-                        _cancellationSource.IsCancellationRequested);
-                    if (!_cancellationSource.IsCancellationRequested)
-                    {
-                        Logger.Log(Level.Error,
-                            "UpdateTaskHost is throwing 
IMRUTaskGroupCommunicationException with cancellation token: {0}.",
-                            _cancellationSource.IsCancellationRequested);
-                        throw new 
IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
-                    }
+                    _dataAndControlMessageSender.Send(message);
                 }
-                else if (e is AggregateException)
+
+                var input = _dataReceiver.Reduce(_cancellationSource);
+
+                if (_invokeGc)
                 {
-                    Logger.Log(Level.Error,
-                        "Received AggregateException. The cancellation token 
is: {0}.",
-                        _cancellationSource.IsCancellationRequested);
-                    if (e.InnerException != null)
-                    {
-                        Logger.Log(Level.Error,
-                            "InnerException {0}, with message {1}.",
-                            e.InnerException.GetType(),
-                            e.InnerException.Message);
-                    }
-                    if (!_cancellationSource.IsCancellationRequested)
-                    {
-                        if (e.InnerException != null && e.InnerException is 
IOException)
-                        {
-                            Logger.Log(Level.Error,
-                                "UpdateTaskHost is throwing 
IMRUTaskGroupCommunicationException with cancellation token: {0}.",
-                                _cancellationSource.IsCancellationRequested);
-                            throw new 
IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError);
-                        }
-                        else
-                        {
-                            throw e;
-                        }
-                    }
+                    Logger.Log(Level.Verbose, "Calling Garbage Collector");
+                    GC.Collect();
+                    GC.WaitForPendingFinalizers();
                 }
-                else
+
+                try
                 {
-                    Logger.Log(Level.Error,
-                       "UpdateTaskHost is throwing Excetion {0}, messge {1} 
with cancellation token: {2} and StackTrace {3}.",
-                       e.GetType(),
-                       e.Message,
-                       _cancellationSource.IsCancellationRequested,
-                       e.StackTrace);
-                    if (!_cancellationSource.IsCancellationRequested)
+                    updateResult = _updateTask.Update(input);
+                    if (updateResult.HasResult)
                     {
-                        throw e;
+                        _resultHandler.HandleResult(updateResult.Result);
                     }
                 }
-            }
-            finally
-            {
-                try
-                {
-                    _resultHandler.Dispose();
-                }
                 catch (Exception e)
                 {
-                    Logger.Log(Level.Error, "Exception in dispose result 
handler.", e);
-                    //// TODO throw proper exceptions JIRA REEF-1492
+                    HandleTaskAppException(e);
                 }
-                _taskCloseCoordinator.SignalTaskStopped();
-                Logger.Log(Level.Info, "UpdateTaskHost returned with 
cancellation token {0}.", _cancellationSource.IsCancellationRequested);
             }
 
-            return null;
-        }
+            if (!_cancellationSource.IsCancellationRequested)
+            {
+                MapInputWithControlMessage<TMapInput> stopMessage =
+                    new 
MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop);
+                _dataAndControlMessageSender.Send(stopMessage);
+            }
 
-        /// <summary>
-        /// Task close handler. Call TaskCloseCoordinator to handle the event.
-        /// </summary>
-        /// <param name="closeEvent"></param>
-        public void OnNext(ICloseEvent closeEvent)
-        {
-            _taskCloseCoordinator.HandleEvent(closeEvent, _cancellationSource);
+            return null;
         }
 
         /// <summary>
-        /// Dispose function. Dispose IGroupCommunicationsClient.
+        /// Dispose resultHandler
         /// </summary>
-        public void Dispose()
+        protected override void FinallyBlock()
         {
-            if (Interlocked.Exchange(ref _disposed, 1) == 0)
+            try
             {
-                _groupCommunicationsClient.Dispose();
+                _resultHandler.Dispose();
+            }
+            catch (Exception e)
+            {
+                Logger.Log(Level.Error, "Exception in dispose result 
handler.", e);
+                //// TODO throw proper exceptions JIRA REEF-1492
             }
         }
 
-        public void OnError(Exception error)
-        {
-            throw new NotImplementedException();
-        }
-
-        public void OnCompleted()
+        /// <summary>
+        /// Return UpdateHostName
+        /// </summary>
+        protected override string TaskHostName
         {
-            throw new NotImplementedException();
+            get { return "UpdateTaskHost"; }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/1c7adac0/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index cd3603a..d89d5a9 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -95,6 +95,7 @@ under the License.
     <Compile Include="OnREEF\IMRUTasks\IMRUTaskSystemException.cs" />
     <Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" />
     <Compile Include="OnREEF\IMRUTasks\TaskCloseCoordinator.cs" />
+    <Compile Include="OnREEF\IMRUTasks\TaskHostBase.cs" />
     <Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" />
     <Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs" 
/>
     <Compile 
Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/1c7adac0/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
index 5e38968..7fec0b7 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs
@@ -60,12 +60,13 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU
             var failedEvaluatorCount = GetMessageCount(lines, 
FailedEvaluatorMessage);
             var failedTaskCount = GetMessageCount(lines, FailedTaskMessage);
 
-            // on each try each task should fail or complete
+            // each task should fail or complete
             // there should be no failed evaluators
-            // and on each try all tasks should start successfully
-            Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + 
failedTaskCount);
+            // all tasks should start successfully
+            // No retry is done because IMRUAppTaskException is triggered by 
the failure in map task execution.
+            Assert.Equal(numTasks, completedTaskCount + failedTaskCount);
             Assert.Equal(0, failedEvaluatorCount);
-            Assert.Equal((NumberOfRetry + 1) * numTasks, runningTaskCount);
+            Assert.Equal(numTasks, runningTaskCount);
             CleanUp(testFolder);
         }
 

Reply via email to