Repository: reef Updated Branches: refs/heads/master 381553643 -> ca358730c
[REEF-1037] Clean up ContextRuntime and TaskRuntime This addressed the issue by * Making classes internal. * Fixing .NET Task usage patterns. * Format and clean up code. JIRA: [REEF-1037](https://issues.apache.org/jira/browse/REEF-1037) This closes #697 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/ca358730 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/ca358730 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/ca358730 Branch: refs/heads/master Commit: ca358730c0af888d18568ecb1145d6973b8fbf2f Parents: 3815536 Author: Andrew Chung <[email protected]> Authored: Wed Dec 2 16:27:48 2015 -0800 Committer: Julia Wang <[email protected]> Committed: Mon Dec 7 15:17:43 2015 -0800 ---------------------------------------------------------------------- .../Runtime/Evaluator/Context/ContextRuntime.cs | 209 +++++-------- .../Evaluator/Context/RootContextLauncher.cs | 2 +- .../Runtime/Evaluator/Task/TaskRuntime.cs | 295 ++++++++----------- 3 files changed, 202 insertions(+), 304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/ca358730/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs index 8390df5..750f6ba 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs @@ -20,7 +20,7 @@ using System; using System.Collections.Generic; using System.Globalization; -using Org.Apache.REEF.Common.Context; +using System.Linq; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Runtime.Evaluator.Task; using Org.Apache.REEF.Common.Tasks; @@ -30,7 +30,7 @@ using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context { - public class ContextRuntime + internal sealed class ContextRuntime { private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextRuntime)); // Context-local injector. This contains information that will not be available in child injectors. @@ -41,12 +41,12 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context // Convenience class to hold all the event handlers for the context as well as the service instances. private readonly ContextLifeCycle _contextLifeCycle; + // The parent context, if any. + private readonly Optional<ContextRuntime> _parentContext; + // The child context, if any. private Optional<ContextRuntime> _childContext = Optional<ContextRuntime>.Empty(); - // The parent context, if any. - private readonly Optional<ContextRuntime> _parentContext = Optional<ContextRuntime>.Empty(); - // The currently running task, if any. private Optional<TaskRuntime> _task = Optional<TaskRuntime>.Empty(); @@ -63,11 +63,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context IConfiguration contextConfiguration, Optional<ContextRuntime> parentContext) { - ContextConfiguration config = contextConfiguration as ContextConfiguration; + var config = contextConfiguration as ContextConfiguration; if (config == null) { - var e = new ArgumentException("contextConfiguration is not of type ContextConfiguration"); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + Utilities.Diagnostics.Exceptions.Throw( + new ArgumentException("contextConfiguration is not of type ContextConfiguration"), LOGGER); } _contextLifeCycle = new ContextLifeCycle(config.Id); _serviceInjector = serviceInjector; @@ -78,14 +78,14 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context } catch (Exception e) { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); Optional<string> parentId = ParentContext.IsPresent() ? Optional<string>.Of(ParentContext.Value.Id) : Optional<string>.Empty(); ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); + Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); } // Trigger the context start events on contextInjector. _contextLifeCycle.Start(); @@ -124,40 +124,40 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// <returns>a child context.</returns> public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration, IConfiguration serviceConfiguration) { - ContextRuntime childContext = null; lock (_contextLifeCycle) { if (_task.IsPresent()) { + // note: java code is putting thread id here var e = new InvalidOperationException( - string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); + Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); } if (_childContext.IsPresent()) { var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); } try { IInjector childServiceInjector = _serviceInjector.ForkInjector(serviceConfiguration); - childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this)); + var childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this)); _childContext = Optional<ContextRuntime>.Of(childContext); return childContext; } catch (Exception e) { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); Optional<string> parentId = ParentContext.IsPresent() ? Optional<string>.Of(ParentContext.Value.Id) : Optional<string>.Empty(); ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); + Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); } } - return childContext; + return null; } /// <summary> @@ -175,12 +175,12 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context { var e = new InvalidOperationException( string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); } if (_childContext.IsPresent()) { var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); } IInjector childServiceInjector = _serviceInjector.ForkInjector(); ContextRuntime childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this)); @@ -199,45 +199,44 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context { lock (_contextLifeCycle) { - bool taskPresent = _task.IsPresent(); - bool taskEnded = taskPresent && _task.Value.HasEnded(); + LOGGER.Log(Level.Info, "ContextRuntime::StartTask(TaskConfiguration) task is present: " + _task.IsPresent()); - LOGGER.Log(Level.Info, "ContextRuntime::StartTask(TaskConfiguration)" + "task is present: " + taskPresent + " task has ended: " + taskEnded); - if (taskPresent) + if (_task.IsPresent()) { LOGGER.Log(Level.Info, "Task state: " + _task.Value.GetTaskState()); - } + LOGGER.Log(Level.Info, "ContextRuntime::StartTask(TaskConfiguration) task has ended: " + _task.Value.HasEnded()); - if (taskEnded) - { - // clean up state - _task = Optional<TaskRuntime>.Empty(); - taskPresent = false; - } - if (taskPresent) - { - var e = new InvalidOperationException( - string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + if (_task.Value.HasEnded()) + { + // clean up state + _task = Optional<TaskRuntime>.Empty(); + } + else + { + // note: java code is putting thread id here + var e = new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); + Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } } + if (_childContext.IsPresent()) { var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); } try { IInjector taskInjector = _contextInjector.ForkInjector(taskConfiguration.TangConfig); LOGGER.Log(Level.Info, "Trying to inject task with configuration" + taskConfiguration.ToString()); - TaskRuntime taskRuntime = new TaskRuntime(taskInjector, contextId, taskConfiguration.TaskId, heartBeatManager); // taskInjector.getInstance(TaskRuntime.class); - taskRuntime.Initialize(); - System.Threading.Tasks.Task.Run(new Action(taskRuntime.Start)); + TaskRuntime taskRuntime = new TaskRuntime(taskInjector, contextId, taskConfiguration.TaskId, heartBeatManager); + taskRuntime.RunTask(); _task = Optional<TaskRuntime>.Of(taskRuntime); } catch (Exception e) { var ex = new TaskClientCodeException(taskConfiguration.TaskId, Id, "Unable to instantiate the new task", e); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Task start error.", LOGGER); + Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Task start error.", LOGGER); } } } @@ -282,11 +281,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context if (!_task.IsPresent()) { LOGGER.Log(Level.Warning, "Received a suspend task while there was no task running. Ignored"); + return; } - else - { - _task.Value.Suspend(message); - } + _task.Value.Suspend(message); } } @@ -303,11 +300,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context if (!_task.IsPresent()) { LOGGER.Log(Level.Warning, "Received a close task while there was no task running. Ignored"); + return; } - else - { - _task.Value.Close(message); - } + _task.Value.Close(message); } } @@ -324,11 +319,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context if (!_task.IsPresent()) { LOGGER.Log(Level.Warning, "Received an task message while there was no task running. Ignored"); + return; } - else - { - _task.Value.Deliver(message); - } + _task.Value.Deliver(message); } } @@ -351,30 +344,27 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context { lock (_contextLifeCycle) { - if (_task.IsPresent()) + if (!_task.IsPresent()) { - if (_task.Value.HasEnded()) - { - _task = Optional<TaskRuntime>.Empty(); - return Optional<TaskStatusProto>.Empty(); - } - else - { - TaskStatusProto taskStatusProto = _task.Value.GetStatusProto(); - if (taskStatusProto.state == State.RUNNING) - { - // only RUNNING status is allowed to rurn here, all other state pushed out to heartbeat - return Optional<TaskStatusProto>.Of(taskStatusProto); - } - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task state must be RUNNING, but instead is in {0} state", taskStatusProto.state)); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - return Optional<TaskStatusProto>.Empty(); - } + return Optional<TaskStatusProto>.Empty(); } - else + + if (_task.Value.HasEnded()) { + _task = Optional<TaskRuntime>.Empty(); return Optional<TaskStatusProto>.Empty(); } + + var taskStatusProto = _task.Value.GetStatusProto(); + if (taskStatusProto.state == State.RUNNING) + { + // only RUNNING status is allowed to rurn here, all other state pushed out to heartbeat + return Optional<TaskStatusProto>.Of(taskStatusProto); + } + + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task state must be RUNNING, but instead is in {0} state", taskStatusProto.state)); + Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + return Optional<TaskStatusProto>.Empty(); } } @@ -389,11 +379,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context { _childContext = Optional<ContextRuntime>.Empty(); } - else - { - var e = new InvalidOperationException("no child context set"); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } + Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("no child context set"), LOGGER); } } @@ -405,7 +391,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context { lock (_contextLifeCycle) { - ContextStatusProto contextStatusProto = new ContextStatusProto() + var contextStatusProto = new ContextStatusProto { context_id = Id, context_state = _contextState, @@ -415,20 +401,16 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context contextStatusProto.parent_id = _parentContext.Value.Id; } - foreach (IContextMessageSource source in _contextLifeCycle.ContextMessageSources) + foreach (var sourceMessage in _contextLifeCycle.ContextMessageSources.Where(src => src.Message.IsPresent()).Select(src => src.Message.Value)) { - Optional<ContextMessage> contextMessageOptional = source.Message; - if (contextMessageOptional.IsPresent()) + var contextMessageProto = new ContextStatusProto.ContextMessageProto { - ContextStatusProto.ContextMessageProto contextMessageProto - = new ContextStatusProto.ContextMessageProto() - { - source_id = contextMessageOptional.Value.MessageSourceId, - }; - contextMessageProto.message = ByteUtilities.CopyBytesFrom(contextMessageOptional.Value.Bytes); - contextStatusProto.context_message.Add(contextMessageProto); - } + source_id = sourceMessage.MessageSourceId, + message = ByteUtilities.CopyBytesFrom(sourceMessage.Bytes), + }; + contextStatusProto.context_message.Add(contextMessageProto); } + return contextStatusProto; } } @@ -461,52 +443,3 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context } } } - ////<summary> - ////TODO: remove and use parameterless GetContextStatus above - ////</summary> - ////<returns>this context's status in protocol buffer form.</returns> - ////public ContextStatusProto GetContextStatus(string contextId) - ////{ - //// ContextStatusProto contextStatusProto = new ContextStatusProto() - //// { - //// context_id = contextId, - //// context_state = _contextState, - //// }; - //// return contextStatusProto; - ////} - - ////TODO: remove and use injection - ////public void StartTask(ITask task, HeartBeatManager heartBeatManager, string taskId, string contextId) - ////{ - //// lock (_contextLifeCycle) - //// { - //// if (_task.IsPresent() && _task.Value.HasEnded()) - //// { - //// // clean up state - //// _task = Optional<TaskRuntime>.Empty(); - //// } - //// if (_task.IsPresent()) - //// { - //// throw new InvalidOperationException( - //// string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here - //// } - //// if (_childContext.IsPresent()) - //// { - //// throw new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); - //// } - //// try - //// { - //// // final Injector taskInjector = contextInjector.forkInjector(taskConfiguration); - //// TaskRuntime taskRuntime // taskInjector.getInstance(TaskRuntime.class); - //// = new TaskRuntime(task, heartBeatManager); - //// LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Starting task '{0}'", taskId)); - //// taskRuntime.Initialize(taskId, contextId); - //// taskRuntime.Start(); - //// _task = Optional<TaskRuntime>.Of(taskRuntime); - //// } - //// catch (Exception e) - //// { - //// throw new InvalidOperationException("Unable to instantiate the new task"); - //// } - //// } - ////} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/ca358730/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs index 151ce0e..8fd52eb 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs @@ -56,7 +56,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context set { _rootContextConfiguration = value; } } - public ContextRuntime GetRootContext() + internal ContextRuntime GetRootContext() { if (_rootContext == null) { http://git-wip-us.apache.org/repos/asf/reef/blob/ca358730/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index b695e95..a60841f 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -20,6 +20,7 @@ using System; using System.Collections.Generic; using System.Globalization; +using System.Threading; using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Tasks; @@ -31,58 +32,39 @@ using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { - public class TaskRuntime : IObserver<ICloseEvent>, IObserver<ISuspendEvent>, IObserver<IDriverMessage> + internal sealed class TaskRuntime : IObserver<ICloseEvent>, IObserver<ISuspendEvent>, IObserver<IDriverMessage> { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskRuntime)); - - private readonly ITask _task; + private static readonly Logger Logger = Logger.GetLogger(typeof(TaskRuntime)); private readonly IInjector _injector; - - // The memento given by the task configuration - private readonly Optional<byte[]> _memento; - - private readonly HeartBeatManager _heartBeatManager; - private readonly TaskStatus _currentStatus; - - private readonly INameClient _nameClient; - private readonly Lazy<IDriverConnectionMessageHandler> _driverConnectionMessageHandler; + private readonly Lazy<IDriverMessageHandler> _driverMessageHandler; + private int taskRan = 0; - public TaskRuntime(IInjector taskInjector, string contextId, string taskId, HeartBeatManager heartBeatManager, string memento = null) + public TaskRuntime(IInjector taskInjector, string contextId, string taskId, HeartBeatManager heartBeatManager) { _injector = taskInjector; - _heartBeatManager = heartBeatManager; - Optional<ISet<ITaskMessageSource>> messageSources = Optional<ISet<ITaskMessageSource>>.Empty(); - try - { - _task = _injector.GetInstance<ITask>(); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to inject task.", LOGGER); - } + var messageSources = Optional<ISet<ITaskMessageSource>>.Empty(); try { ITaskMessageSource taskMessageSource = _injector.GetInstance<ITaskMessageSource>(); - messageSources = Optional<ISet<ITaskMessageSource>>.Of(new HashSet<ITaskMessageSource>() { taskMessageSource }); + messageSources = Optional<ISet<ITaskMessageSource>>.Of(new HashSet<ITaskMessageSource> { taskMessageSource }); } catch (Exception e) { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot inject task message source with error: " + e.StackTrace, LOGGER); + Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot inject task message source with error: " + e.StackTrace, Logger); // do not rethrow since this is benign } try { - _nameClient = _injector.GetInstance<INameClient>(); - _heartBeatManager.EvaluatorSettings.NameClient = _nameClient; + heartBeatManager.EvaluatorSettings.NameClient = _injector.GetInstance<INameClient>(); } catch (InjectionException) { - LOGGER.Log(Level.Warning, "Cannot inject name client from task configuration."); // do not rethrow since user is not required to provide name client + Logger.Log(Level.Warning, "Cannot inject name client from task configuration."); } _driverConnectionMessageHandler = new Lazy<IDriverConnectionMessageHandler>(() => @@ -93,16 +75,28 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } catch (InjectionException) { - LOGGER.Log(Level.Info, "User did not implement IDriverConnectionMessageHandler."); + Logger.Log(Level.Info, "User did not implement IDriverConnectionMessageHandler."); + } + + return null; + }); + + _driverMessageHandler = new Lazy<IDriverMessageHandler>(() => + { + try + { + return _injector.GetInstance<IDriverMessageHandler>(); + } + catch (InjectionException ie) + { + Utilities.Diagnostics.Exceptions.CaughtAndThrow(ie, Level.Error, "Received Driver message, but unable to inject handler for driver message ", Logger); } return null; }); - LOGGER.Log(Level.Info, "task message source injected"); - _currentStatus = new TaskStatus(_heartBeatManager, contextId, taskId, messageSources); - _memento = memento == null ? - Optional<byte[]>.Empty() : Optional<byte[]>.Of(ByteUtilities.StringToByteArrays(memento)); + Logger.Log(Level.Info, "task message source injected"); + _currentStatus = new TaskStatus(heartBeatManager, contextId, taskId, messageSources); } public string TaskId @@ -115,45 +109,65 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task get { return _currentStatus.ContextId; } } - public void Initialize() - { - _currentStatus.SetRunning(); - } - /// <summary> - /// Run the task + /// Runs the task asynchronously. /// </summary> - public void Start() + public void RunTask() { - try + if (Interlocked.Exchange(ref taskRan, 1) != 0) { - LOGGER.Log(Level.Info, "Call Task"); - if (_currentStatus.IsNotRunning()) - { - var e = new InvalidOperationException("TaskRuntime not in Running state, instead it is in state " + _currentStatus.State); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - - byte[] taskMemento = _memento.IsPresent() ? _memento.Value : null; - var result = RunTask(taskMemento); - - LOGGER.Log(Level.Info, "Task Call Finished"); - _currentStatus.SetResult(result); - if (result != null && result.Length > 0) - { - LOGGER.Log(Level.Info, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); - } + // Return if we have already called RunTask + throw new InvalidOperationException("TaskRun has already been called on TaskRuntime."); } - catch (Exception e) + + _currentStatus.SetRunning(); + ITask userTask; + try { - LOGGER.Log(Level.Warning, - string.Format(CultureInfo.InvariantCulture, "Task failed caused by exception [{0}]", e)); - _currentStatus.SetException(e); + userTask = _injector.GetInstance<ITask>(); } - finally + catch (Exception e) { - _task.Dispose(); + Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to inject task.", Logger); + return; } + + System.Threading.Tasks.Task.Run(() => userTask.Call(null)).ContinueWith( + runTask => + { + try + { + // Task failed. + if (runTask.IsFaulted) + { + Logger.Log(Level.Warning, + string.Format(CultureInfo.InvariantCulture, "Task failed caused by exception [{0}]", runTask.Exception)); + _currentStatus.SetException(runTask.Exception); + return; + } + + if (runTask.IsCanceled) + { + Logger.Log(Level.Warning, + string.Format(CultureInfo.InvariantCulture, "Task failed caused by task cancellation")); + return; + } + + // Task completed. + var result = runTask.Result; + Logger.Log(Level.Info, "Task Call Finished"); + _currentStatus.SetResult(result); + if (result != null && result.Length > 0) + { + Logger.Log(Level.Info, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); + } + } + finally + { + userTask.Dispose(); + runTask.Dispose(); + } + }); } public TaskState GetTaskState() @@ -175,60 +189,45 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task return _currentStatus.HasEnded(); } - /// <summary> - /// get ID of the task. - /// </summary> - /// <returns>ID of the task.</returns> - public string GetActicityId() - { - return _currentStatus.TaskId; - } - public void Close(byte[] message) { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId)); + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId)); if (_currentStatus.IsNotRunning()) { - LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State)); + Logger.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State)); + return; } - else + try { - try - { - OnNext(new CloseEventImpl(message)); - _currentStatus.SetCloseRequested(); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", LOGGER); - - _currentStatus.SetException( - new TaskClientCodeException(TaskId, ContextId, "Error during Close().", e)); - } + OnNext(new CloseEventImpl(message)); + _currentStatus.SetCloseRequested(); + } + catch (Exception e) + { + Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", Logger); + _currentStatus.SetException(new TaskClientCodeException(TaskId, ContextId, "Error during Close().", e)); } } public void Suspend(byte[] message) { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to suspend Task {0}", TaskId)); - + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to suspend Task {0}", TaskId)); + if (_currentStatus.IsNotRunning()) { - LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is in {0} state. Ignored.", _currentStatus.State)); + Logger.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is in {0} state. Ignored.", _currentStatus.State)); + return; } - else + try { - try - { - OnNext(new SuspendEventImpl(message)); - _currentStatus.SetSuspendRequested(); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Suspend.", LOGGER); - _currentStatus.SetException( - new TaskClientCodeException(TaskId, ContextId, "Error during Suspend().", e)); - } + OnNext(new SuspendEventImpl(message)); + _currentStatus.SetSuspendRequested(); + } + catch (Exception e) + { + Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Suspend.", Logger); + _currentStatus.SetException( + new TaskClientCodeException(TaskId, ContextId, "Error during Suspend().", e)); } } @@ -236,88 +235,49 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { if (_currentStatus.IsNotRunning()) { - LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an task that is in {0} state. Ignored.", _currentStatus.State)); + Logger.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an task that is in {0} state. Ignored.", _currentStatus.State)); + return; } - else + try { - try - { - OnNext(new DriverMessageImpl(message)); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during message delivery.", LOGGER); - _currentStatus.SetException( - new TaskClientCodeException(TaskId, ContextId, "Error during message delivery.", e)); - } + OnNext(new DriverMessageImpl(message)); + } + catch (Exception e) + { + Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during message delivery.", Logger); + _currentStatus.SetException( + new TaskClientCodeException(TaskId, ContextId, "Error during message delivery.", e)); } } public void OnNext(ICloseEvent value) { - LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)"); + Logger.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)"); // TODO: send a heartbeat } - void IObserver<ICloseEvent>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<IDriverMessage>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<IDriverMessage>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<ISuspendEvent>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<ISuspendEvent>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<ICloseEvent>.OnCompleted() - { - throw new NotImplementedException(); - } - public void OnNext(ISuspendEvent value) { - LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)"); + Logger.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)"); // TODO: send a heartbeat } public void OnNext(IDriverMessage value) { - IDriverMessageHandler messageHandler = null; - LOGGER.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage value)"); - try + Logger.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage value)"); + + if (_driverMessageHandler.Value == null) { - messageHandler = _injector.GetInstance<IDriverMessageHandler>(); + return; } - catch (Exception e) + try { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Received Driver message, but unable to inject handler for driver message ", LOGGER); + _driverMessageHandler.Value.Handle(value); } - if (messageHandler != null) + catch (Exception e) { - try - { - messageHandler.Handle(value); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Exception throw when handling driver message: " + e, LOGGER); - _currentStatus.RecordExecptionWithoutHeartbeat(e); - } + Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Exception throw when handling driver message: " + e, Logger); + _currentStatus.RecordExecptionWithoutHeartbeat(e); } } @@ -334,9 +294,14 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task _driverConnectionMessageHandler.Value.OnNext(message); } - private byte[] RunTask(byte[] memento) + public void OnError(Exception error) { - return _task.Call(memento); + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); } } } \ No newline at end of file
