Repository: reef
Updated Branches:
  refs/heads/master 381553643 -> ca358730c


[REEF-1037] Clean up ContextRuntime and TaskRuntime

This addressed the issue by
  * Making classes internal.
  * Fixing .NET Task usage patterns.
  * Format and clean up code.

JIRA:
  [REEF-1037](https://issues.apache.org/jira/browse/REEF-1037)

This closes #697


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/ca358730
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/ca358730
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/ca358730

Branch: refs/heads/master
Commit: ca358730c0af888d18568ecb1145d6973b8fbf2f
Parents: 3815536
Author: Andrew Chung <[email protected]>
Authored: Wed Dec 2 16:27:48 2015 -0800
Committer: Julia Wang <[email protected]>
Committed: Mon Dec 7 15:17:43 2015 -0800

----------------------------------------------------------------------
 .../Runtime/Evaluator/Context/ContextRuntime.cs | 209 +++++--------
 .../Evaluator/Context/RootContextLauncher.cs    |   2 +-
 .../Runtime/Evaluator/Task/TaskRuntime.cs       | 295 ++++++++-----------
 3 files changed, 202 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/ca358730/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
index 8390df5..750f6ba 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
@@ -20,7 +20,7 @@
 using System;
 using System.Collections.Generic;
 using System.Globalization;
-using Org.Apache.REEF.Common.Context;
+using System.Linq;
 using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
 using Org.Apache.REEF.Common.Runtime.Evaluator.Task;
 using Org.Apache.REEF.Common.Tasks;
@@ -30,7 +30,7 @@ using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
 {
-    public class ContextRuntime
+    internal sealed class ContextRuntime
     {
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(ContextRuntime));
         // Context-local injector. This contains information that will not be 
available in child injectors.
@@ -41,12 +41,12 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         // Convenience class to hold all the event handlers for the context as 
well as the service instances.
         private readonly ContextLifeCycle _contextLifeCycle;
 
+        // The parent context, if any.
+        private readonly Optional<ContextRuntime> _parentContext;
+
         // The child context, if any.
         private Optional<ContextRuntime> _childContext = 
Optional<ContextRuntime>.Empty();
 
-        // The parent context, if any.
-        private readonly Optional<ContextRuntime> _parentContext = 
Optional<ContextRuntime>.Empty();
-
         // The currently running task, if any.
         private Optional<TaskRuntime> _task = Optional<TaskRuntime>.Empty();
 
@@ -63,11 +63,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
                 IConfiguration contextConfiguration,
                 Optional<ContextRuntime> parentContext)
         {
-            ContextConfiguration config = contextConfiguration as 
ContextConfiguration;
+            var config = contextConfiguration as ContextConfiguration;
             if (config == null)
             {
-                var e = new ArgumentException("contextConfiguration is not of 
type ContextConfiguration");
-                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, 
LOGGER);
+                Utilities.Diagnostics.Exceptions.Throw(
+                    new ArgumentException("contextConfiguration is not of type 
ContextConfiguration"), LOGGER);
             }
             _contextLifeCycle = new ContextLifeCycle(config.Id);
             _serviceInjector = serviceInjector;
@@ -78,14 +78,14 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
             }
             catch (Exception e)
             {
-                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, 
Level.Error, LOGGER);
+                Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, 
LOGGER);
 
                 Optional<string> parentId = ParentContext.IsPresent() ?
                     Optional<string>.Of(ParentContext.Value.Id) :
                     Optional<string>.Empty();
                 ContextClientCodeException ex = new 
ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration),
 parentId, "Unable to spawn context", e);
                 
-                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, 
LOGGER);
+                Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER);
             }
             // Trigger the context start events on contextInjector.
             _contextLifeCycle.Start();
@@ -124,40 +124,40 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         /// <returns>a child context.</returns>
         public ContextRuntime SpawnChildContext(IConfiguration 
contextConfiguration, IConfiguration serviceConfiguration)
         {
-            ContextRuntime childContext = null;
             lock (_contextLifeCycle)
             {
                 if (_task.IsPresent())
                 {
+                    // note: java code is putting thread id here
                     var e = new InvalidOperationException(
-                        string.Format(CultureInfo.InvariantCulture, 
"Attempting to spawn a child context when an Task with id '{0}' is running", 
_task.Value.TaskId)); // note: java code is putting thread id here
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, 
LOGGER);
+                        string.Format(CultureInfo.InvariantCulture, 
"Attempting to spawn a child context when an Task with id '{0}' is running", 
_task.Value.TaskId));
+                    Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
                 }
                 if (_childContext.IsPresent())
                 {
                     var e = new InvalidOperationException("Attempting to 
instantiate a child context on a context that is not the topmost active 
context.");
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, 
LOGGER);
+                    Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
                 }
                 try
                 {
                     IInjector childServiceInjector = 
_serviceInjector.ForkInjector(serviceConfiguration);
-                    childContext = new ContextRuntime(childServiceInjector, 
contextConfiguration, Optional<ContextRuntime>.Of(this));
+                    var childContext = new 
ContextRuntime(childServiceInjector, contextConfiguration, 
Optional<ContextRuntime>.Of(this));
                     _childContext = Optional<ContextRuntime>.Of(childContext);
                     return childContext;
                 }
                 catch (Exception e)
                 {
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, 
Level.Error, LOGGER);
+                    Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, 
LOGGER);
 
                     Optional<string> parentId = ParentContext.IsPresent() ?
                         Optional<string>.Of(ParentContext.Value.Id) :
                         Optional<string>.Empty();
                     ContextClientCodeException ex = new 
ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration),
 parentId, "Unable to spawn context", e);
                     
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, 
LOGGER);
+                    Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER);
                 }
             }
-            return childContext;
+            return null;
         }
 
         /// <summary>
@@ -175,12 +175,12 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
                 {
                     var e = new InvalidOperationException(
                         string.Format(CultureInfo.InvariantCulture, 
"Attempting to spawn a child context when an Task with id '{0}' is running", 
_task.Value.TaskId)); // note: java code is putting thread id here
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, 
LOGGER);
+                    Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
                 }
                 if (_childContext.IsPresent())
                 {
                     var e = new InvalidOperationException("Attempting to 
instantiate a child context on a context that is not the topmost active 
context.");
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, 
LOGGER);
+                    Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
                 }
                 IInjector childServiceInjector = 
_serviceInjector.ForkInjector();
                 ContextRuntime childContext = new 
ContextRuntime(childServiceInjector, contextConfiguration, 
Optional<ContextRuntime>.Of(this));
@@ -199,45 +199,44 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         {
             lock (_contextLifeCycle)
             {
-                bool taskPresent = _task.IsPresent();
-                bool taskEnded = taskPresent && _task.Value.HasEnded();
+                LOGGER.Log(Level.Info, 
"ContextRuntime::StartTask(TaskConfiguration) task is present: " + 
_task.IsPresent());
 
-                LOGGER.Log(Level.Info, 
"ContextRuntime::StartTask(TaskConfiguration)" + "task is present: " + 
taskPresent + " task has ended: " + taskEnded);
-                if (taskPresent)
+                if (_task.IsPresent())
                 {
                     LOGGER.Log(Level.Info, "Task state: " + 
_task.Value.GetTaskState());
-                }
+                    LOGGER.Log(Level.Info, 
"ContextRuntime::StartTask(TaskConfiguration) task has ended: " + 
_task.Value.HasEnded());
 
-                if (taskEnded)
-                {
-                    // clean up state
-                    _task = Optional<TaskRuntime>.Empty();
-                    taskPresent = false;
-                }
-                if (taskPresent)
-                {
-                    var e = new InvalidOperationException(
-                        string.Format(CultureInfo.InvariantCulture, 
"Attempting to spawn a child context when an Task with id '{0}' is running", 
_task.Value.TaskId)); // note: java code is putting thread id here
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, 
LOGGER);
+                    if (_task.Value.HasEnded())
+                    {
+                        // clean up state
+                        _task = Optional<TaskRuntime>.Empty();
+                    }
+                    else
+                    {
+                        // note: java code is putting thread id here
+                        var e = new InvalidOperationException(
+                        string.Format(CultureInfo.InvariantCulture, 
"Attempting to spawn a child context when an Task with id '{0}' is running", 
_task.Value.TaskId));
+                        Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                    }
                 }
+
                 if (_childContext.IsPresent())
                 {
                     var e = new InvalidOperationException("Attempting to 
instantiate a child context on a context that is not the topmost active 
context.");
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, 
LOGGER);
+                    Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
                 }
                 try
                 {
                     IInjector taskInjector = 
_contextInjector.ForkInjector(taskConfiguration.TangConfig);
                     LOGGER.Log(Level.Info, "Trying to inject task with 
configuration" + taskConfiguration.ToString());
-                    TaskRuntime taskRuntime = new TaskRuntime(taskInjector, 
contextId, taskConfiguration.TaskId, heartBeatManager); // 
taskInjector.getInstance(TaskRuntime.class);
-                    taskRuntime.Initialize();
-                    System.Threading.Tasks.Task.Run(new 
Action(taskRuntime.Start));                    
+                    TaskRuntime taskRuntime = new TaskRuntime(taskInjector, 
contextId, taskConfiguration.TaskId, heartBeatManager);
+                    taskRuntime.RunTask();
                     _task = Optional<TaskRuntime>.Of(taskRuntime);
                 }
                 catch (Exception e)
                 {
                     var ex = new 
TaskClientCodeException(taskConfiguration.TaskId, Id, "Unable to instantiate 
the new task", e);
-                    
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, 
Level.Error, "Task start error.", LOGGER);
+                    Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, 
Level.Error, "Task start error.", LOGGER);
                 }
             }
         }
@@ -282,11 +281,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
                 if (!_task.IsPresent())
                 {
                     LOGGER.Log(Level.Warning, "Received a suspend task while 
there was no task running. Ignored");
+                    return;
                 }
-                else
-                {
-                    _task.Value.Suspend(message);
-                }
+                _task.Value.Suspend(message);
             }
         }
 
@@ -303,11 +300,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
                 if (!_task.IsPresent())
                 {
                     LOGGER.Log(Level.Warning, "Received a close task while 
there was no task running. Ignored");
+                    return;
                 }
-                else
-                {
-                    _task.Value.Close(message);
-                }
+                _task.Value.Close(message);
             }
         }
 
@@ -324,11 +319,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
                 if (!_task.IsPresent())
                 {
                     LOGGER.Log(Level.Warning, "Received an task message while 
there was no task running. Ignored");
+                    return;
                 }
-                else
-                {
-                    _task.Value.Deliver(message);
-                }
+                _task.Value.Deliver(message);
             }
         }
 
@@ -351,30 +344,27 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         {
             lock (_contextLifeCycle)
             {
-                if (_task.IsPresent())
+                if (!_task.IsPresent())
                 {
-                    if (_task.Value.HasEnded())
-                    {
-                        _task = Optional<TaskRuntime>.Empty();
-                        return Optional<TaskStatusProto>.Empty();
-                    }
-                    else
-                    {
-                        TaskStatusProto taskStatusProto = 
_task.Value.GetStatusProto();
-                        if (taskStatusProto.state == State.RUNNING)
-                        {
-                            // only RUNNING status is allowed to rurn here, 
all other state pushed out to heartbeat 
-                            return 
Optional<TaskStatusProto>.Of(taskStatusProto);
-                        }
-                        var e = new 
InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task 
state must be RUNNING, but instead is in {0} state", taskStatusProto.state));
-                        
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
-                        return Optional<TaskStatusProto>.Empty();
-                    }
+                    return Optional<TaskStatusProto>.Empty();
                 }
-                else
+
+                if (_task.Value.HasEnded())
                 {
+                    _task = Optional<TaskRuntime>.Empty();
                     return Optional<TaskStatusProto>.Empty();
                 }
+
+                var taskStatusProto = _task.Value.GetStatusProto();
+                if (taskStatusProto.state == State.RUNNING)
+                {
+                    // only RUNNING status is allowed to rurn here, all other 
state pushed out to heartbeat 
+                    return Optional<TaskStatusProto>.Of(taskStatusProto);
+                }
+
+                var e = new 
InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task 
state must be RUNNING, but instead is in {0} state", taskStatusProto.state));
+                Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                return Optional<TaskStatusProto>.Empty();
             }
         }
 
@@ -389,11 +379,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
                 {
                     _childContext = Optional<ContextRuntime>.Empty();
                 }
-                else
-                {
-                    var e = new InvalidOperationException("no child context 
set");
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, 
LOGGER);
-                }
+                Utilities.Diagnostics.Exceptions.Throw(new 
InvalidOperationException("no child context set"), LOGGER);
             }
         }
 
@@ -405,7 +391,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         {
             lock (_contextLifeCycle)
             {
-                ContextStatusProto contextStatusProto = new 
ContextStatusProto()
+                var contextStatusProto = new ContextStatusProto
                 {
                     context_id = Id,
                     context_state = _contextState,
@@ -415,20 +401,16 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
                     contextStatusProto.parent_id = _parentContext.Value.Id;
                 }
 
-                foreach (IContextMessageSource source in 
_contextLifeCycle.ContextMessageSources)
+                foreach (var sourceMessage in 
_contextLifeCycle.ContextMessageSources.Where(src => 
src.Message.IsPresent()).Select(src => src.Message.Value))
                 {
-                    Optional<ContextMessage> contextMessageOptional = 
source.Message;
-                    if (contextMessageOptional.IsPresent())
+                    var contextMessageProto = new 
ContextStatusProto.ContextMessageProto
                     {
-                        ContextStatusProto.ContextMessageProto 
contextMessageProto
-                            = new ContextStatusProto.ContextMessageProto()
-                            {
-                                source_id = 
contextMessageOptional.Value.MessageSourceId,
-                            };
-                        contextMessageProto.message = 
ByteUtilities.CopyBytesFrom(contextMessageOptional.Value.Bytes);
-                        
contextStatusProto.context_message.Add(contextMessageProto);
-                    }
+                        source_id = sourceMessage.MessageSourceId,
+                        message = 
ByteUtilities.CopyBytesFrom(sourceMessage.Bytes),
+                    };
+                    
contextStatusProto.context_message.Add(contextMessageProto);
                 }
+
                 return contextStatusProto;
             }
         }
@@ -461,52 +443,3 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         }
     }
 }
-        ////<summary>
-        ////TODO: remove and use parameterless GetContextStatus above
-        ////</summary>
-        ////<returns>this context's status in protocol buffer form.</returns>
-        ////public ContextStatusProto GetContextStatus(string contextId)
-        ////{
-        ////   ContextStatusProto contextStatusProto = new ContextStatusProto()
-        ////   {
-        ////       context_id = contextId,
-        ////       context_state = _contextState,
-        ////   };
-        ////   return contextStatusProto;
-        ////}
-
-        ////TODO: remove and use injection
-        ////public void StartTask(ITask task, HeartBeatManager 
heartBeatManager, string taskId, string contextId)
-        ////{
-        ////  lock (_contextLifeCycle)
-        ////  {
-        ////      if (_task.IsPresent() && _task.Value.HasEnded())
-        ////      {
-        ////          // clean up state
-        ////          _task = Optional<TaskRuntime>.Empty();
-        ////      }
-        ////      if (_task.IsPresent())
-        ////      {
-        ////          throw new InvalidOperationException(
-        ////              string.Format(CultureInfo.InvariantCulture, 
"Attempting to spawn a child context when an Task with id '{0}' is running", 
_task.Value.TaskId)); // note: java code is putting thread id here
-        ////      }
-        ////      if (_childContext.IsPresent())
-        ////      {
-        ////          throw new InvalidOperationException("Attempting to 
instantiate a child context on a context that is not the topmost active 
context.");
-        ////      }
-        ////      try
-        ////      {
-        ////          // final Injector taskInjector = 
contextInjector.forkInjector(taskConfiguration);
-        ////          TaskRuntime taskRuntime  // 
taskInjector.getInstance(TaskRuntime.class);
-        ////              = new TaskRuntime(task, heartBeatManager);
-        ////          LOGGER.Log(Level.Info, 
string.Format(CultureInfo.InvariantCulture, "Starting task '{0}'", taskId));
-        ////          taskRuntime.Initialize(taskId, contextId);
-        ////          taskRuntime.Start();
-        ////          _task = Optional<TaskRuntime>.Of(taskRuntime);
-        ////      }
-        ////      catch (Exception e)
-        ////      {
-        ////          throw new InvalidOperationException("Unable to 
instantiate the new task");
-        ////      }
-        ////   }
-        ////}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/ca358730/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs
 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs
index 151ce0e..8fd52eb 100644
--- 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs
+++ 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs
@@ -56,7 +56,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
             set { _rootContextConfiguration = value; }
         }
 
-        public ContextRuntime GetRootContext()
+        internal ContextRuntime GetRootContext()
         {
             if (_rootContext == null)
             {

http://git-wip-us.apache.org/repos/asf/reef/blob/ca358730/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index b695e95..a60841f 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -20,6 +20,7 @@
 using System;
 using System.Collections.Generic;
 using System.Globalization;
+using System.Threading;
 using Org.Apache.REEF.Common.Io;
 using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
 using Org.Apache.REEF.Common.Tasks;
@@ -31,58 +32,39 @@ using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
 {
-    public class TaskRuntime : IObserver<ICloseEvent>, 
IObserver<ISuspendEvent>, IObserver<IDriverMessage>
+    internal sealed class TaskRuntime : IObserver<ICloseEvent>, 
IObserver<ISuspendEvent>, IObserver<IDriverMessage>
     {
-        private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(TaskRuntime));
-        
-        private readonly ITask _task;
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(TaskRuntime));
 
         private readonly IInjector _injector;
-
-        // The memento given by the task configuration
-        private readonly Optional<byte[]> _memento;
-
-        private readonly HeartBeatManager _heartBeatManager;
-
         private readonly TaskStatus _currentStatus;
-
-        private readonly INameClient _nameClient;
-
         private readonly Lazy<IDriverConnectionMessageHandler> 
_driverConnectionMessageHandler;
+        private readonly Lazy<IDriverMessageHandler> _driverMessageHandler;
+        private int taskRan = 0;
 
-        public TaskRuntime(IInjector taskInjector, string contextId, string 
taskId, HeartBeatManager heartBeatManager, string memento = null)
+        public TaskRuntime(IInjector taskInjector, string contextId, string 
taskId, HeartBeatManager heartBeatManager)
         {
             _injector = taskInjector;
-            _heartBeatManager = heartBeatManager;
 
-            Optional<ISet<ITaskMessageSource>> messageSources = 
Optional<ISet<ITaskMessageSource>>.Empty();
-            try
-            {
-                _task = _injector.GetInstance<ITask>();
-            }
-            catch (Exception e)
-            {
-                
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new 
InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to 
inject task.", LOGGER);
-            }
+            var messageSources = Optional<ISet<ITaskMessageSource>>.Empty();
             try
             {
                 ITaskMessageSource taskMessageSource = 
_injector.GetInstance<ITaskMessageSource>();
-                messageSources = Optional<ISet<ITaskMessageSource>>.Of(new 
HashSet<ITaskMessageSource>() { taskMessageSource });
+                messageSources = Optional<ISet<ITaskMessageSource>>.Of(new 
HashSet<ITaskMessageSource> { taskMessageSource });
             }
             catch (Exception e)
             {
-                Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, 
Level.Warning, "Cannot inject task message source with error: " + e.StackTrace, 
LOGGER);
+                Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, 
"Cannot inject task message source with error: " + e.StackTrace, Logger);
                 // do not rethrow since this is benign
             }
             try
             {
-                _nameClient = _injector.GetInstance<INameClient>();
-                _heartBeatManager.EvaluatorSettings.NameClient = _nameClient;
+                heartBeatManager.EvaluatorSettings.NameClient = 
_injector.GetInstance<INameClient>();
             }
             catch (InjectionException)
             {
-                LOGGER.Log(Level.Warning, "Cannot inject name client from task 
configuration.");
                 // do not rethrow since user is not required to provide name 
client
+                Logger.Log(Level.Warning, "Cannot inject name client from task 
configuration.");
             }
 
             _driverConnectionMessageHandler = new 
Lazy<IDriverConnectionMessageHandler>(() =>
@@ -93,16 +75,28 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                 }
                 catch (InjectionException)
                 {
-                    LOGGER.Log(Level.Info, "User did not implement 
IDriverConnectionMessageHandler.");
+                    Logger.Log(Level.Info, "User did not implement 
IDriverConnectionMessageHandler.");
+                }
+
+                return null;
+            });
+
+            _driverMessageHandler = new Lazy<IDriverMessageHandler>(() =>
+            {
+                try
+                {
+                    return _injector.GetInstance<IDriverMessageHandler>();
+                }
+                catch (InjectionException ie)
+                {
+                    Utilities.Diagnostics.Exceptions.CaughtAndThrow(ie, 
Level.Error, "Received Driver message, but unable to inject handler for driver 
message ", Logger);
                 }
 
                 return null;
             });
 
-            LOGGER.Log(Level.Info, "task message source injected");
-            _currentStatus = new TaskStatus(_heartBeatManager, contextId, 
taskId, messageSources);
-            _memento = memento == null ?
-                Optional<byte[]>.Empty() : 
Optional<byte[]>.Of(ByteUtilities.StringToByteArrays(memento));
+            Logger.Log(Level.Info, "task message source injected");
+            _currentStatus = new TaskStatus(heartBeatManager, contextId, 
taskId, messageSources);
         }
 
         public string TaskId
@@ -115,45 +109,65 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             get { return _currentStatus.ContextId; }
         }
 
-        public void Initialize()
-        {
-            _currentStatus.SetRunning();
-        }
-
         /// <summary>
-        /// Run the task
+        /// Runs the task asynchronously.
         /// </summary>
-        public void Start()
+        public void RunTask()
         {
-            try
+            if (Interlocked.Exchange(ref taskRan, 1) != 0)
             {
-                LOGGER.Log(Level.Info, "Call Task");
-                if (_currentStatus.IsNotRunning())
-                {
-                    var e = new InvalidOperationException("TaskRuntime not in 
Running state, instead it is in state " + _currentStatus.State);
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, 
LOGGER);
-                }
-
-                byte[] taskMemento = _memento.IsPresent() ? _memento.Value : 
null;
-                var result = RunTask(taskMemento);
-
-                LOGGER.Log(Level.Info, "Task Call Finished");
-                _currentStatus.SetResult(result);
-                if (result != null && result.Length > 0)
-                {
-                    LOGGER.Log(Level.Info, "Task running result:\r\n" + 
System.Text.Encoding.Default.GetString(result));
-                }
+                // Return if we have already called RunTask
+                throw new InvalidOperationException("TaskRun has already been 
called on TaskRuntime.");
             }
-            catch (Exception e)
+
+            _currentStatus.SetRunning();
+            ITask userTask;
+            try
             {
-                LOGGER.Log(Level.Warning,
-                    string.Format(CultureInfo.InvariantCulture, "Task failed 
caused by exception [{0}]", e));
-                _currentStatus.SetException(e);
+                userTask = _injector.GetInstance<ITask>();
             }
-            finally
+            catch (Exception e)
             {
-                _task.Dispose();
+                Utilities.Diagnostics.Exceptions.CaughtAndThrow(new 
InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to 
inject task.", Logger);
+                return;
             }
+
+            System.Threading.Tasks.Task.Run(() => 
userTask.Call(null)).ContinueWith(
+                runTask =>
+                {
+                    try
+                    {
+                        // Task failed.
+                        if (runTask.IsFaulted)
+                        {
+                            Logger.Log(Level.Warning,
+                                string.Format(CultureInfo.InvariantCulture, 
"Task failed caused by exception [{0}]", runTask.Exception));
+                            _currentStatus.SetException(runTask.Exception);
+                            return;
+                        }
+
+                        if (runTask.IsCanceled)
+                        {
+                            Logger.Log(Level.Warning,
+                                string.Format(CultureInfo.InvariantCulture, 
"Task failed caused by task cancellation"));
+                            return;
+                        }
+
+                        // Task completed.
+                        var result = runTask.Result;
+                        Logger.Log(Level.Info, "Task Call Finished");
+                        _currentStatus.SetResult(result);
+                        if (result != null && result.Length > 0)
+                        {
+                            Logger.Log(Level.Info, "Task running result:\r\n" 
+ System.Text.Encoding.Default.GetString(result));
+                        }
+                    }
+                    finally
+                    {
+                        userTask.Dispose();
+                        runTask.Dispose();
+                    }
+                });
         }
 
         public TaskState GetTaskState()
@@ -175,60 +189,45 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             return _currentStatus.HasEnded();
         }
 
-        /// <summary>
-        /// get ID of the task.
-        /// </summary>
-        /// <returns>ID of the task.</returns>
-        public string GetActicityId()
-        {
-            return _currentStatus.TaskId;
-        }
-
         public void Close(byte[] message)
         {
-            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, 
"Trying to close Task {0}", TaskId));
+            Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, 
"Trying to close Task {0}", TaskId));
             if (_currentStatus.IsNotRunning())
             {
-                LOGGER.Log(Level.Warning, 
string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in 
{0} state. Ignored.", _currentStatus.State));
+                Logger.Log(Level.Warning, 
string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in 
{0} state. Ignored.", _currentStatus.State));
+                return;
             }
-            else
+            try
             {
-                try
-                {
-                    OnNext(new CloseEventImpl(message));
-                    _currentStatus.SetCloseRequested();
-                }
-                catch (Exception e)
-                {
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, 
Level.Error, "Error during Close.", LOGGER);
-
-                    _currentStatus.SetException(
-                        new TaskClientCodeException(TaskId, ContextId, "Error 
during Close().", e));
-                }
+                OnNext(new CloseEventImpl(message));
+                _currentStatus.SetCloseRequested();
+            }
+            catch (Exception e)
+            {
+                Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error 
during Close.", Logger);
+                _currentStatus.SetException(new 
TaskClientCodeException(TaskId, ContextId, "Error during Close().", e));
             }
         }
 
         public void Suspend(byte[] message)
         {
-            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, 
"Trying to suspend Task {0}", TaskId));
-            
+            Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, 
"Trying to suspend Task {0}", TaskId));
+
             if (_currentStatus.IsNotRunning())
             {
-                LOGGER.Log(Level.Warning, 
string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is 
in {0} state. Ignored.", _currentStatus.State));
+                Logger.Log(Level.Warning, 
string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is 
in {0} state. Ignored.", _currentStatus.State));
+                return;
             }
-            else
+            try
             {
-                try
-                {
-                    OnNext(new SuspendEventImpl(message));
-                    _currentStatus.SetSuspendRequested();
-                }
-                catch (Exception e)
-                {
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, 
Level.Error, "Error during Suspend.", LOGGER);
-                    _currentStatus.SetException(
-                        new TaskClientCodeException(TaskId, ContextId, "Error 
during Suspend().", e));
-                }
+                OnNext(new SuspendEventImpl(message));
+                _currentStatus.SetSuspendRequested();
+            }
+            catch (Exception e)
+            {
+                Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error 
during Suspend.", Logger);
+                _currentStatus.SetException(
+                    new TaskClientCodeException(TaskId, ContextId, "Error 
during Suspend().", e));
             }
         }
 
@@ -236,88 +235,49 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         {
             if (_currentStatus.IsNotRunning())
             {
-                LOGGER.Log(Level.Warning, 
string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an 
task that is in {0} state. Ignored.", _currentStatus.State));
+                Logger.Log(Level.Warning, 
string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an 
task that is in {0} state. Ignored.", _currentStatus.State));
+                return;
             }
-            else
+            try
             {
-                try
-                {
-                    OnNext(new DriverMessageImpl(message));
-                }
-                catch (Exception e)
-                {
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, 
Level.Error, "Error during message delivery.", LOGGER);
-                    _currentStatus.SetException(
-                        new TaskClientCodeException(TaskId, ContextId, "Error 
during message delivery.", e));
-                }
+                OnNext(new DriverMessageImpl(message));
+            }
+            catch (Exception e)
+            {
+                Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error 
during message delivery.", Logger);
+                _currentStatus.SetException(
+                    new TaskClientCodeException(TaskId, ContextId, "Error 
during message delivery.", e));
             }
         }
 
         public void OnNext(ICloseEvent value)
         {
-            LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)");
+            Logger.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)");
             // TODO: send a heartbeat
         }
 
-        void IObserver<ICloseEvent>.OnError(Exception error)
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<IDriverMessage>.OnCompleted()
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<IDriverMessage>.OnError(Exception error)
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<ISuspendEvent>.OnCompleted()
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<ISuspendEvent>.OnError(Exception error)
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<ICloseEvent>.OnCompleted()
-        {
-            throw new NotImplementedException();
-        }
-
         public void OnNext(ISuspendEvent value)
         {
-            LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)");
+            Logger.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)");
             // TODO: send a heartbeat
         }
 
         public void OnNext(IDriverMessage value)
         {
-            IDriverMessageHandler messageHandler = null;
-            LOGGER.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage 
value)");
-            try
+            Logger.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage 
value)");
+
+            if (_driverMessageHandler.Value == null)
             {
-                messageHandler = 
_injector.GetInstance<IDriverMessageHandler>();
+                return;
             }
-            catch (Exception e)
+            try
             {
-                
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, 
"Received Driver message, but unable to inject handler for driver message ", 
LOGGER);
+                _driverMessageHandler.Value.Handle(value);
             }
-            if (messageHandler != null)
+            catch (Exception e)
             {
-                try
-                {
-                    messageHandler.Handle(value);
-                }
-                catch (Exception e)
-                {
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, 
Level.Warning, "Exception throw when handling driver message: " + e, LOGGER);
-                    _currentStatus.RecordExecptionWithoutHeartbeat(e);
-                }
+                Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, 
"Exception throw when handling driver message: " + e, Logger);
+                _currentStatus.RecordExecptionWithoutHeartbeat(e);
             }
         }
 
@@ -334,9 +294,14 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             _driverConnectionMessageHandler.Value.OnNext(message);
         }
 
-        private byte[] RunTask(byte[] memento)
+        public void OnError(Exception error)
         {
-            return _task.Call(memento);
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
         }
     }
 }
\ No newline at end of file

Reply via email to