Repository: reef Updated Branches: refs/heads/master 9b034dd3c -> 1c7adac04
[REEF-1550] Clean up task exceptions in IMRU task hosts * Add base class TaskHostBase to hold common IMRU configurations and exception handling logic * Refactor MapTaskHost and UpdateTaskHost * Update test cases JIRA: [REEF-1550](https://issues.apache.org/jira/browse/REEF-1550) Pull request: This closes #1113 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/1c7adac0 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/1c7adac0 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/1c7adac0 Branch: refs/heads/master Commit: 1c7adac041243973f53ec98b0cf8cd468c58dd93 Parents: 9b034dd Author: Julia Wang <[email protected]> Authored: Fri Sep 9 14:13:41 2016 -0700 Committer: Mariia Mykhailova <[email protected]> Committed: Fri Sep 9 16:50:20 2016 -0700 ---------------------------------------------------------------------- .../OnREEF/IMRUTasks/MapTaskHost.cs | 175 +++------------ .../OnREEF/IMRUTasks/TaskHostBase.cs | 223 +++++++++++++++++++ .../OnREEF/IMRUTasks/UpdateTaskHost.cs | 211 +++++------------- .../Org.Apache.REEF.IMRU.csproj | 1 + .../Functional/IMRU/TestFailMapperTasks.cs | 9 +- 5 files changed, 309 insertions(+), 310 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/1c7adac0/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs index ca2fb85..f843036 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs @@ -16,10 +16,6 @@ // under the License. using System; -using System.IO; -using System.Net.Sockets; -using System.Runtime.Remoting; -using System.Threading; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.IMRU.API; @@ -31,7 +27,6 @@ using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote.Impl; namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks { @@ -41,197 +36,83 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// <typeparam name="TMapInput">Map input</typeparam> /// <typeparam name="TMapOutput">Map output</typeparam> [ThreadSafe] - internal sealed class MapTaskHost<TMapInput, TMapOutput> : ITask, IObserver<ICloseEvent> + internal sealed class MapTaskHost<TMapInput, TMapOutput> : TaskHostBase, ITask, IObserver<ICloseEvent> { private static readonly Logger Logger = Logger.GetLogger(typeof(MapTaskHost<TMapInput, TMapOutput>)); private readonly IBroadcastReceiver<MapInputWithControlMessage<TMapInput>> _dataAndMessageReceiver; private readonly IReduceSender<TMapOutput> _dataReducer; private readonly IMapFunction<TMapInput, TMapOutput> _mapTask; - private readonly bool _invokeGC; - - /// <summary> - /// Shows if the object has been disposed. - /// </summary> - private int _disposed = 0; - - /// <summary> - /// Group Communication client for the task - /// </summary> - private readonly IGroupCommClient _groupCommunicationsClient; - - /// <summary> - /// Task close Coordinator to handle the work when receiving task close event - /// </summary> - private readonly TaskCloseCoordinator _taskCloseCoordinator; - - /// <summary> - /// The cancellation token to control the group communication operation cancellation - /// </summary> - private readonly CancellationTokenSource _cancellationSource; /// <summary> /// </summary> /// <param name="mapTask">The MapTask hosted in this REEF Task.</param> /// <param name="groupCommunicationsClient">Used to setup the communications.</param> /// <param name="taskCloseCoordinator">Task close Coordinator</param> - /// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param> + /// <param name="invokeGc">Whether to call Garbage Collector after each iteration or not</param> /// <param name="taskId">task id</param> [Inject] private MapTaskHost( IMapFunction<TMapInput, TMapOutput> mapTask, IGroupCommClient groupCommunicationsClient, TaskCloseCoordinator taskCloseCoordinator, - [Parameter(typeof(InvokeGC))] bool invokeGC, - [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId) + [Parameter(typeof(InvokeGC))] bool invokeGc, + [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId) : + base(groupCommunicationsClient, taskCloseCoordinator, invokeGc) { Logger.Log(Level.Info, "Entering constructor of MapTaskHost for task id {0}", taskId); _mapTask = mapTask; - _groupCommunicationsClient = groupCommunicationsClient; - var cg = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName); _dataAndMessageReceiver = - cg.GetBroadcastReceiver<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName); - _dataReducer = cg.GetReduceSender<TMapOutput>(IMRUConstants.ReduceOperatorName); - _invokeGC = invokeGC; - _taskCloseCoordinator = taskCloseCoordinator; - _cancellationSource = new CancellationTokenSource(); + _communicationGroupClient.GetBroadcastReceiver<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName); + _dataReducer = _communicationGroupClient.GetReduceSender<TMapOutput>(IMRUConstants.ReduceOperatorName); Logger.Log(Level.Info, "MapTaskHost initialized."); } /// <summary> /// Performs IMRU iterations on map side /// </summary> - /// <param name="memento"></param> /// <returns></returns> - public byte[] Call(byte[] memento) + protected override byte[] TaskBody(byte[] memento) { - Logger.Log(Level.Info, "Entering MapTaskHost Call()."); MapControlMessage controlMessage = MapControlMessage.AnotherRound; - try + while (!_cancellationSource.IsCancellationRequested && controlMessage != MapControlMessage.Stop) { - while (!_cancellationSource.IsCancellationRequested && controlMessage != MapControlMessage.Stop) + if (_invokeGc) { - if (_invokeGC) - { - Logger.Log(Level.Verbose, "Calling Garbage Collector"); - GC.Collect(); - GC.WaitForPendingFinalizers(); - } - - using ( - MapInputWithControlMessage<TMapInput> mapInput = - _dataAndMessageReceiver.Receive(_cancellationSource)) - { - controlMessage = mapInput.ControlMessage; - if (controlMessage != MapControlMessage.Stop) - { - _dataReducer.Send(_mapTask.Map(mapInput.Message), _cancellationSource); - } - } + Logger.Log(Level.Verbose, "Calling Garbage Collector"); + GC.Collect(); + GC.WaitForPendingFinalizers(); } - } - catch (OperationCanceledException e) - { - Logger.Log(Level.Warning, - "Received OperationCanceledException in MapTaskHost with message: {0}. The cancellation token is: {1}.", - e.Message, - _cancellationSource.IsCancellationRequested); - } - catch (Exception e) - { - if (e is IOException || e is TcpClientConnectionException || e is RemotingException || - e is SocketException) - { - Logger.Log(Level.Error, - "Received Exception {0} in MapTaskHost with message: {1}. The cancellation token is: {2}.", - e.GetType(), - e.Message, - _cancellationSource.IsCancellationRequested); - if (!_cancellationSource.IsCancellationRequested) - { - Logger.Log(Level.Error, - "MapTask is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.", - _cancellationSource.IsCancellationRequested); - throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError); - } - } - else if (e is AggregateException) + + using ( + MapInputWithControlMessage<TMapInput> mapInput = + _dataAndMessageReceiver.Receive(_cancellationSource)) { - Logger.Log(Level.Error, - "Received AggregateException. The cancellation token is: {0}.", - _cancellationSource.IsCancellationRequested); - if (e.InnerException != null) - { - Logger.Log(Level.Error, - "InnerException {0}, with message {1}.", - e.InnerException.GetType(), - e.InnerException.Message); - } - if (!_cancellationSource.IsCancellationRequested) + controlMessage = mapInput.ControlMessage; + if (controlMessage != MapControlMessage.Stop) { - if (e.InnerException != null && e.InnerException is IOException) + TMapOutput output = default(TMapOutput); + try { - Logger.Log(Level.Error, - "MapTask is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.", - _cancellationSource.IsCancellationRequested); - throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError); + output = _mapTask.Map(mapInput.Message); } - else + catch (Exception e) { - throw e; + HandleTaskAppException(e); } - } - } - else - { - Logger.Log(Level.Error, - "MapTask is throwing Exception {0}, message {1} with cancellation token: {2} and StackTrace {3}.", - e.GetType(), - e.Message, - _cancellationSource.IsCancellationRequested, - e.StackTrace); - if (!_cancellationSource.IsCancellationRequested) - { - throw e; + _dataReducer.Send(output, _cancellationSource); } } } - finally - { - _taskCloseCoordinator.SignalTaskStopped(); - } - Logger.Log(Level.Info, "MapTaskHost returned with cancellation token:{0}.", _cancellationSource.IsCancellationRequested); return null; } /// <summary> - /// Task close handler. Calls TaskCloseCoordinator to handle the event. + /// Return mapTaskHost name /// </summary> - /// <param name="closeEvent"></param> - public void OnNext(ICloseEvent closeEvent) - { - _taskCloseCoordinator.HandleEvent(closeEvent, _cancellationSource); - } - - /// <summary> - /// Dispose function. Dispose IGroupCommunicationsClient. - /// </summary> - public void Dispose() - { - if (Interlocked.Exchange(ref _disposed, 1) == 0) - { - _groupCommunicationsClient.Dispose(); - } - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() + protected override string TaskHostName { - throw new NotImplementedException(); + get { return "MapTaskHost"; } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1c7adac0/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs new file mode 100644 index 0000000..d025528 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskHostBase.cs @@ -0,0 +1,223 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.IO; +using System.Net.Sockets; +using System.Runtime.Remoting; +using System.Threading; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.IMRU.OnREEF.Driver; +using Org.Apache.REEF.Network.Group.Task; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote.Impl; + +namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks +{ + internal abstract class TaskHostBase + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TaskHostBase)); + + /// <summary> + /// Shows if the object has been disposed. + /// </summary> + private int _disposed; + + /// <summary> + /// Group Communication client for the task + /// </summary> + private readonly IGroupCommClient _groupCommunicationsClient; + + /// <summary> + /// Task close Coordinator to handle the work when receiving task close event + /// </summary> + private readonly TaskCloseCoordinator _taskCloseCoordinator; + + /// <summary> + /// Specify whether to invoke garbage collector or not + /// </summary> + protected readonly bool _invokeGc; + + /// <summary> + /// CommunicationGroupClient for the task + /// </summary> + protected readonly ICommunicationGroupClient _communicationGroupClient; + + /// <summary> + /// The cancellation token to control the group communication operation cancellation + /// </summary> + protected readonly CancellationTokenSource _cancellationSource; + + /// <summary> + /// Task host base class to hold the common stuff of both mapper and update tasks + /// </summary> + /// <param name="groupCommunicationsClient">Group Communication Client</param> + /// <param name="taskCloseCoordinator">The class that handles the close event for the task</param> + /// <param name="invokeGc">specify if want to invoke garbage collector or not </param> + protected TaskHostBase( + IGroupCommClient groupCommunicationsClient, + TaskCloseCoordinator taskCloseCoordinator, + bool invokeGc) + { + _groupCommunicationsClient = groupCommunicationsClient; + _communicationGroupClient = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName); + + _invokeGc = invokeGc; + _taskCloseCoordinator = taskCloseCoordinator; + _cancellationSource = new CancellationTokenSource(); + } + + /// <summary> + /// Handle the exceptions in the Call() method + /// Default to IMRUSystemException to make it recoverable + /// </summary> + public byte[] Call(byte[] memento) + { + Logger.Log(Level.Info, "Entering {0} Call().", TaskHostName); + try + { + return TaskBody(memento); + } + catch (Exception e) + { + if (e is IMRUTaskAppException) + { + throw; + } + if (IsCommunicationException(e)) + { + HandleCommunicationException(e); + } + else + { + HandleSystemException(e); + } + } + finally + { + FinallyBlock(); + _taskCloseCoordinator.SignalTaskStopped(); + } + Logger.Log(Level.Info, "{0} returned with cancellation token:{1}.", TaskHostName, _cancellationSource.IsCancellationRequested); + return null; + } + + private static bool IsCommunicationException(Exception e) + { + if (e is OperationCanceledException || e is IOException || e is TcpClientConnectionException || + e is RemotingException || e is SocketException || + (e is AggregateException && e.InnerException != null && e.InnerException is IOException)) + { + return true; + } + return false; + } + + /// <summary> + /// The body of Call method. Subclass must override it. + /// </summary> + protected abstract byte[] TaskBody(byte[] memento); + + /// <summary> + /// The code that needs to be executed no matter exception happens or not in Call() method. + /// </summary> + protected virtual void FinallyBlock() + { + } + + /// <summary> + /// Task host name + /// </summary> + protected abstract string TaskHostName { get; } + + /// <summary> + /// Task close handler. Call TaskCloseCoordinator to handle the event. + /// </summary> + public void OnNext(ICloseEvent closeEvent) + { + _taskCloseCoordinator.HandleEvent(closeEvent, _cancellationSource); + } + + /// <summary> + /// Dispose function. Dispose IGroupCommunicationsClient. + /// </summary> + public void Dispose() + { + if (Interlocked.Exchange(ref _disposed, 1) == 0) + { + _groupCommunicationsClient.Dispose(); + } + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + /// <summary> + /// Convert the exception into IMRUTaskGroupCommunicationException + /// </summary> + protected void HandleCommunicationException(Exception e) + { + HandleException(e, new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError, e)); + } + + /// <summary> + /// Convert the exception into IMRUSystemException + /// </summary> + protected void HandleSystemException(Exception e) + { + HandleException(e, new IMRUTaskSystemException(TaskManager.TaskSystemError, e)); + } + + /// <summary> + /// Convert the exception into IMRUTaskAppException + /// </summary> + protected void HandleTaskAppException(Exception e) + { + HandleException(e, new IMRUTaskAppException(TaskManager.TaskAppError, e)); + } + + /// <summary> + /// Log and throw target exception if cancellation token is not set + /// In the cancellation case, simply log and return. + /// </summary> + private void HandleException(Exception originalException, Exception targetException) + { + Logger.Log(Level.Error, + "Received Exception {0} in {1} with message: {2}. The cancellation token is: {3}.", + originalException.GetType(), + TaskHostName, + originalException.Message, + _cancellationSource.IsCancellationRequested); + if (!_cancellationSource.IsCancellationRequested) + { + Logger.Log(Level.Error, + "{0} is throwing {1} with cancellation token: {2}.", + TaskHostName, + targetException.GetType(), + _cancellationSource.IsCancellationRequested); + throw targetException; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1c7adac0/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs index 01a2bdb..382721c 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs @@ -16,10 +16,6 @@ // under the License. using System; -using System.IO; -using System.Net.Sockets; -using System.Runtime.Remoting; -using System.Threading; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.IMRU.API; @@ -31,7 +27,6 @@ using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote.Impl; namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks { @@ -42,43 +37,22 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// <typeparam name="TMapOutput">Map output</typeparam> /// <typeparam name="TResult">Final result</typeparam> [ThreadSafe] - internal sealed class UpdateTaskHost<TMapInput, TMapOutput, TResult> : ITask, IObserver<ICloseEvent> + internal sealed class UpdateTaskHost<TMapInput, TMapOutput, TResult> : TaskHostBase, ITask, IObserver<ICloseEvent> { private static readonly Logger Logger = Logger.GetLogger(typeof(UpdateTaskHost<TMapInput, TMapOutput, TResult>)); private readonly IReduceReceiver<TMapOutput> _dataReceiver; private readonly IBroadcastSender<MapInputWithControlMessage<TMapInput>> _dataAndControlMessageSender; private readonly IUpdateFunction<TMapInput, TMapOutput, TResult> _updateTask; - private readonly bool _invokeGC; private readonly IIMRUResultHandler<TResult> _resultHandler; /// <summary> - /// Shows if the object has been disposed. - /// </summary> - private int _disposed = 0; - - /// <summary> - /// Group Communication client for the task - /// </summary> - private readonly IGroupCommClient _groupCommunicationsClient; - - /// <summary> - /// Task close Coordinator to handle the work when receiving task close event - /// </summary> - private readonly TaskCloseCoordinator _taskCloseCoordinator; - - /// <summary> - /// The cancellation token to control the group communication operation cancellation - /// </summary> - private readonly CancellationTokenSource _cancellationSource; - - /// <summary> /// </summary> /// <param name="updateTask">The UpdateTask hosted in this REEF Task.</param> /// <param name="groupCommunicationsClient">Used to setup the communications.</param> /// <param name="resultHandler">Result handler</param> /// <param name="taskCloseCoordinator">Task close Coordinator</param> - /// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param> + /// <param name="invokeGc">Whether to call Garbage Collector after each iteration or not</param> /// <param name="taskId">task id</param> [Inject] private UpdateTaskHost( @@ -86,180 +60,99 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks IGroupCommClient groupCommunicationsClient, IIMRUResultHandler<TResult> resultHandler, TaskCloseCoordinator taskCloseCoordinator, - [Parameter(typeof(InvokeGC))] bool invokeGC, - [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId) + [Parameter(typeof(InvokeGC))] bool invokeGc, + [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId) : + base(groupCommunicationsClient, taskCloseCoordinator, invokeGc) { Logger.Log(Level.Info, "Entering constructor of UpdateTaskHost for task id {0}", taskId); _updateTask = updateTask; - _groupCommunicationsClient = groupCommunicationsClient; - var cg = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName); _dataAndControlMessageSender = - cg.GetBroadcastSender<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName); - _dataReceiver = cg.GetReduceReceiver<TMapOutput>(IMRUConstants.ReduceOperatorName); - _invokeGC = invokeGC; + _communicationGroupClient.GetBroadcastSender<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName); + _dataReceiver = _communicationGroupClient.GetReduceReceiver<TMapOutput>(IMRUConstants.ReduceOperatorName); _resultHandler = resultHandler; - _taskCloseCoordinator = taskCloseCoordinator; - _cancellationSource = new CancellationTokenSource(); Logger.Log(Level.Info, "UpdateTaskHost initialized."); } /// <summary> /// Performs IMRU iterations on update side /// </summary> - /// <param name="memento"></param> /// <returns></returns> - public byte[] Call(byte[] memento) + protected override byte[] TaskBody(byte[] memento) { - Logger.Log(Level.Info, "Entering UpdateTaskHost Call()."); - var updateResult = _updateTask.Initialize(); - int iterNo = 0; + UpdateResult<TMapInput, TResult> updateResult = null; try { - while (updateResult.HasMapInput && !_cancellationSource.IsCancellationRequested) - { - iterNo++; - - using ( - var message = new MapInputWithControlMessage<TMapInput>(updateResult.MapInput, - MapControlMessage.AnotherRound)) - { - _dataAndControlMessageSender.Send(message); - } - - var input = _dataReceiver.Reduce(_cancellationSource); - - if (_invokeGC) - { - Logger.Log(Level.Verbose, "Calling Garbage Collector"); - GC.Collect(); - GC.WaitForPendingFinalizers(); - } - - updateResult = _updateTask.Update(input); - - if (updateResult.HasResult) - { - _resultHandler.HandleResult(updateResult.Result); - } - } - if (!_cancellationSource.IsCancellationRequested) - { - MapInputWithControlMessage<TMapInput> stopMessage = - new MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop); - _dataAndControlMessageSender.Send(stopMessage); - } + updateResult = _updateTask.Initialize(); } - catch (OperationCanceledException e) + catch (Exception e) { - Logger.Log(Level.Warning, - "Received OperationCanceledException in UpdateTaskHost with message: {0}.", - e.Message); + HandleTaskAppException(e); } - catch (Exception e) + + while (!_cancellationSource.IsCancellationRequested && updateResult.HasMapInput) { - if (e is IOException || e is TcpClientConnectionException || e is RemotingException || - e is SocketException) + using ( + var message = new MapInputWithControlMessage<TMapInput>(updateResult.MapInput, + MapControlMessage.AnotherRound)) { - Logger.Log(Level.Error, - "Received Exception {0} in UpdateTaskHost with message: {1}. The cancellation token is: {2}.", - e.GetType(), - e.Message, - _cancellationSource.IsCancellationRequested); - if (!_cancellationSource.IsCancellationRequested) - { - Logger.Log(Level.Error, - "UpdateTaskHost is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.", - _cancellationSource.IsCancellationRequested); - throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError); - } + _dataAndControlMessageSender.Send(message); } - else if (e is AggregateException) + + var input = _dataReceiver.Reduce(_cancellationSource); + + if (_invokeGc) { - Logger.Log(Level.Error, - "Received AggregateException. The cancellation token is: {0}.", - _cancellationSource.IsCancellationRequested); - if (e.InnerException != null) - { - Logger.Log(Level.Error, - "InnerException {0}, with message {1}.", - e.InnerException.GetType(), - e.InnerException.Message); - } - if (!_cancellationSource.IsCancellationRequested) - { - if (e.InnerException != null && e.InnerException is IOException) - { - Logger.Log(Level.Error, - "UpdateTaskHost is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.", - _cancellationSource.IsCancellationRequested); - throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError); - } - else - { - throw e; - } - } + Logger.Log(Level.Verbose, "Calling Garbage Collector"); + GC.Collect(); + GC.WaitForPendingFinalizers(); } - else + + try { - Logger.Log(Level.Error, - "UpdateTaskHost is throwing Excetion {0}, messge {1} with cancellation token: {2} and StackTrace {3}.", - e.GetType(), - e.Message, - _cancellationSource.IsCancellationRequested, - e.StackTrace); - if (!_cancellationSource.IsCancellationRequested) + updateResult = _updateTask.Update(input); + if (updateResult.HasResult) { - throw e; + _resultHandler.HandleResult(updateResult.Result); } } - } - finally - { - try - { - _resultHandler.Dispose(); - } catch (Exception e) { - Logger.Log(Level.Error, "Exception in dispose result handler.", e); - //// TODO throw proper exceptions JIRA REEF-1492 + HandleTaskAppException(e); } - _taskCloseCoordinator.SignalTaskStopped(); - Logger.Log(Level.Info, "UpdateTaskHost returned with cancellation token {0}.", _cancellationSource.IsCancellationRequested); } - return null; - } + if (!_cancellationSource.IsCancellationRequested) + { + MapInputWithControlMessage<TMapInput> stopMessage = + new MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop); + _dataAndControlMessageSender.Send(stopMessage); + } - /// <summary> - /// Task close handler. Call TaskCloseCoordinator to handle the event. - /// </summary> - /// <param name="closeEvent"></param> - public void OnNext(ICloseEvent closeEvent) - { - _taskCloseCoordinator.HandleEvent(closeEvent, _cancellationSource); + return null; } /// <summary> - /// Dispose function. Dispose IGroupCommunicationsClient. + /// Dispose resultHandler /// </summary> - public void Dispose() + protected override void FinallyBlock() { - if (Interlocked.Exchange(ref _disposed, 1) == 0) + try { - _groupCommunicationsClient.Dispose(); + _resultHandler.Dispose(); + } + catch (Exception e) + { + Logger.Log(Level.Error, "Exception in dispose result handler.", e); + //// TODO throw proper exceptions JIRA REEF-1492 } } - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() + /// <summary> + /// Return UpdateHostName + /// </summary> + protected override string TaskHostName { - throw new NotImplementedException(); + get { return "UpdateTaskHost"; } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1c7adac0/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index cd3603a..d89d5a9 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -95,6 +95,7 @@ under the License. <Compile Include="OnREEF\IMRUTasks\IMRUTaskSystemException.cs" /> <Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" /> <Compile Include="OnREEF\IMRUTasks\TaskCloseCoordinator.cs" /> + <Compile Include="OnREEF\IMRUTasks\TaskHostBase.cs" /> <Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" /> <Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs" /> <Compile Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/1c7adac0/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs index 5e38968..7fec0b7 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailMapperTasks.cs @@ -60,12 +60,13 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU var failedEvaluatorCount = GetMessageCount(lines, FailedEvaluatorMessage); var failedTaskCount = GetMessageCount(lines, FailedTaskMessage); - // on each try each task should fail or complete + // each task should fail or complete // there should be no failed evaluators - // and on each try all tasks should start successfully - Assert.Equal((NumberOfRetry + 1) * numTasks, completedTaskCount + failedTaskCount); + // all tasks should start successfully + // No retry is done because IMRUAppTaskException is triggered by the failure in map task execution. + Assert.Equal(numTasks, completedTaskCount + failedTaskCount); Assert.Equal(0, failedEvaluatorCount); - Assert.Equal((NumberOfRetry + 1) * numTasks, runningTaskCount); + Assert.Equal(numTasks, runningTaskCount); CleanUp(testFolder); }
