Repository: reef
Updated Branches:
  refs/heads/master 372223469 -> 30a1dac46


[REEF-1039] Clean up evaluator runtime and context management

This addressed the issue by
  * Cleaned up incorrect usage of Task.Start.
  * Removed usage of Stack in ContextManager.
  * Cleaned up style issues.

JIRA:
  [REEF-1039](https://issues.apache.org/jira/browse/REEF-1039)

Pull request:
  This closes #701


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/30a1dac4
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/30a1dac4
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/30a1dac4

Branch: refs/heads/master
Commit: 30a1dac46187d11c6567e951de9b7af4ea73c084
Parents: 3722234
Author: Andrew Chung <[email protected]>
Authored: Thu Dec 3 15:08:23 2015 -0800
Committer: Dongjoon Hyun <[email protected]>
Committed: Sat Dec 5 22:25:28 2015 +0900

----------------------------------------------------------------------
 .../Runtime/Evaluator/Context/ContextManager.cs | 150 ++++++++++---------
 .../Runtime/Evaluator/Context/ContextRuntime.cs |  21 ++-
 .../Runtime/Evaluator/EvaluatorRuntime.cs       |  91 ++++-------
 lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs  | 144 +++++++-----------
 4 files changed, 185 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/30a1dac4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs
index ac27723..b054d30 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs
@@ -31,15 +31,13 @@ using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
 {
-    public class ContextManager : IDisposable
+    public sealed class ContextManager : IDisposable
     {
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(ContextManager));
-        
-        private readonly Stack<ContextRuntime> _contextStack = new 
Stack<ContextRuntime>();
-
         private readonly HeartBeatManager _heartBeatManager;
-
         private readonly RootContextLauncher _rootContextLauncher;
+        private readonly object _contextLock = new object();
+        private ContextRuntime _topContext = null;
 
         public ContextManager(HeartBeatManager heartBeatManager, 
Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> 
rootTaskConfig)
         {
@@ -55,22 +53,21 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         /// </summary>
         public void Start()
         {
-            lock (_contextStack)
+            lock (_contextLock)
             {
-                ContextRuntime rootContext = 
_rootContextLauncher.GetRootContext();
-                LOGGER.Log(Level.Info, 
string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id 
{0}", rootContext.Id));
-                _contextStack.Push(rootContext);
+                _topContext = _rootContextLauncher.GetRootContext();
+                LOGGER.Log(Level.Info, 
string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id 
{0}", _topContext.Id));
 
                 if (_rootContextLauncher.RootTaskConfig.IsPresent())
                 {
                     LOGGER.Log(Level.Info, "Launching the initial Task");
                     try
                     {
-                        
_contextStack.Peek().StartTask(_rootContextLauncher.RootTaskConfig.Value, 
_rootContextLauncher.RootContextConfig.Id, _heartBeatManager);
+                        
_topContext.StartTask(_rootContextLauncher.RootTaskConfig.Value, 
_rootContextLauncher.RootContextConfig.Id, _heartBeatManager);
                     }
                     catch (TaskClientCodeException e)
                     {
-                        
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, 
"Exception when trying to start a task.", LOGGER);
+                        Utilities.Diagnostics.Exceptions.Caught(e, 
Level.Error, "Exception when trying to start a task.", LOGGER);
                         HandleTaskException(e);
                     }
                 }
@@ -79,9 +76,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
 
         public bool ContextStackIsEmpty()
         {
-            lock (_contextStack)
+            lock (_contextLock)
             {
-                return _contextStack.Count == 0;
+                return _topContext == null;
             }
         }
 
@@ -99,7 +96,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
                 byte[] message = controlMessage.task_message;
                 if (controlMessage.add_context != null && 
controlMessage.remove_context != null)
                 {
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new 
InvalidOperationException("Received a message with both add and remove context. 
This is unsupported."), LOGGER);
+                    Utilities.Diagnostics.Exceptions.Throw(new 
InvalidOperationException("Received a message with both add and remove context. 
This is unsupported."), LOGGER);
                 }
                 if (controlMessage.add_context != null)
                 {
@@ -132,56 +129,67 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
                 else if (controlMessage.stop_task != null)
                 {
                     LOGGER.Log(Level.Info, "CloseTask");
-                    _contextStack.Peek().CloseTask(message);
+                    lock (_contextLock)
+                    {
+                        _topContext.CloseTask(message);
+                    }
                 }
                 else if (controlMessage.suspend_task != null)
                 {
                     LOGGER.Log(Level.Info, "SuspendTask");
-                    _contextStack.Peek().SuspendTask(message);
+                    lock (_contextLock)
+                    {
+                        _topContext.SuspendTask(message);
+                    }
                 }
                 else if (controlMessage.task_message != null)
                 {
                     LOGGER.Log(Level.Info, "DeliverTaskMessage");
-                    _contextStack.Peek().DeliverTaskMessage(message);
+                    lock (_contextLock)
+                    {
+                        _topContext.DeliverTaskMessage(message);
+                    }
                 }
                 else if (controlMessage.context_message != null)
                 {
                     LOGGER.Log(Level.Info, "Handle context contol message");
                     ContextMessageProto contextMessageProto = 
controlMessage.context_message;
-                    bool deliveredMessage = false;
-                    foreach (ContextRuntime context in _contextStack)
+                    ContextRuntime context = null;
+                    lock (_contextLock)
                     {
-                        if (context.Id.Equals(contextMessageProto.context_id))
+                        if (_topContext != null)
                         {
-                            LOGGER.Log(Level.Info, 
string.Format(CultureInfo.InvariantCulture, "Handle context message {0}", 
controlMessage.context_message.message));
-                            
context.HandleContextMessaage(controlMessage.context_message.message);
-                            deliveredMessage = true;
-                            break;
+                            context = 
_topContext.GetContextStack().FirstOrDefault(ctx => 
ctx.Id.Equals(contextMessageProto.context_id));
                         }
                     }
-                    if (!deliveredMessage)
+
+                    if (context != null)
+                    {
+                        
context.HandleContextMessage(controlMessage.context_message.message);
+                    }
+                    else
                     {
-                        InvalidOperationException e = new 
InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent 
message to unknown context {0}", contextMessageProto.context_id));
-                        
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                        var e = new 
InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent 
message to unknown context {0}", contextMessageProto.context_id));
+                        Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
                     }
                 }
                 else
                 {
                     InvalidOperationException e = new 
InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown 
task control message: {0}", controlMessage.ToString()));
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, 
LOGGER);
+                    Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
                 } 
             }
             catch (Exception e)
             {
                 if (e is TaskClientCodeException)
                 {
-                    HandleTaskException((TaskClientCodeException)e);
+                    HandleTaskException(e as TaskClientCodeException);
                 }
                 else if (e is ContextClientCodeException)
                 {
-                    HandlContextException((ContextClientCodeException)e);
+                    HandleContextException(e as ContextClientCodeException);
                 }
-                
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, 
LOGGER);
+                Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, 
Level.Error, LOGGER);
             }  
         }
 
@@ -191,13 +199,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         /// <returns>the TaskStatusProto of the currently running task, if 
there is any</returns>
         public Optional<TaskStatusProto> GetTaskStatus()
         {
-            if (_contextStack.Count == 0)
+            lock (_contextLock)
             {
-                return Optional<TaskStatusProto>.Empty();
-
-                // throw new InvalidOperationException("Asked for an Task 
status while there isn't even a context running.");
+                return _topContext == null ? Optional<TaskStatusProto>.Empty() 
: _topContext.GetTaskStatus();
             }
-            return _contextStack.Peek().GetTaskStatus();
         }
 
         /// <summary>
@@ -207,11 +212,16 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         public ICollection<ContextStatusProto> GetContextStatusCollection()
         {
             ICollection<ContextStatusProto> result = new 
Collection<ContextStatusProto>();
-            foreach (ContextRuntime runtime in _contextStack)
+            lock (_contextLock)
             {
-                ContextStatusProto contextStatusProto = 
runtime.GetContextStatus();
-                LOGGER.Log(Level.Verbose, 
string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", 
contextStatusProto));
-                result.Add(contextStatusProto);
+                if (_topContext != null)
+                {
+                    foreach (var contextStatusProto in 
_topContext.GetContextStack().Select(context => context.GetContextStatus()))
+                    {
+                        LOGGER.Log(Level.Verbose, 
string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", 
contextStatusProto));
+                        result.Add(contextStatusProto);
+                    }
+                }
             }
             return result;
         }
@@ -222,16 +232,13 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         /// </summary>
         public void Dispose()
         {
-            lock (_contextStack)
+            lock (_contextLock)
             {
-                if (_contextStack != null && _contextStack.Any())
+                if (_topContext != null)
                 {
                     LOGGER.Log(Level.Info, "context stack not empty, 
forcefully closing context runtime.");
-                    ContextRuntime runtime = _contextStack.Last();
-                    if (runtime != null)
-                    {
-                        runtime.Dispose();
-                    }
+                    _topContext.Dispose();
+                    _topContext = null;
                 }
             }
         }
@@ -241,7 +248,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         /// </summary>
         internal void HandleDriverConnectionMessage(IDriverConnectionMessage 
message)
         {
-            _contextStack.Peek().HandleDriverConnectionMessage(message);
+            lock (_contextLock)
+            {
+                _topContext.HandleDriverConnectionMessage(message);
+            }
         }
 
         /// <summary>
@@ -250,29 +260,29 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         /// <param name="addContextProto"></param>
         private void AddContext(AddContextProto addContextProto)
         {
-            lock (_contextStack)
+            lock (_contextLock)
             {
-                ContextRuntime currentTopContext = _contextStack.Peek();
+                var currentTopContext = _topContext;
                 if 
(!currentTopContext.Id.Equals(addContextProto.parent_context_id, 
StringComparison.OrdinalIgnoreCase))
                 {
                     var e = new 
InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying 
to instantiate a child context on context with id '{0}' while the current top 
context id is {1}",
                         addContextProto.parent_context_id,
                         currentTopContext.Id));
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, 
LOGGER);
+                    Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
                 }
                 string contextConfigString = 
addContextProto.context_configuration;
-                ContextConfiguration contextConfiguration = new 
ContextConfiguration(contextConfigString);
+                var contextConfiguration = new 
ContextConfiguration(contextConfigString);
                 ContextRuntime newTopContext;
                 if (addContextProto.service_configuration != null)
                 {
-                    ServiceConfiguration serviceConfiguration = new 
ServiceConfiguration(addContextProto.service_configuration);
+                    var serviceConfiguration = new 
ServiceConfiguration(addContextProto.service_configuration);
                     newTopContext = 
currentTopContext.SpawnChildContext(contextConfiguration, 
serviceConfiguration.TangConfig);
                 }
                 else
                 {
                     newTopContext = 
currentTopContext.SpawnChildContext(contextConfiguration);
                 }
-                _contextStack.Push(newTopContext);
+                _topContext = newTopContext;
             }
         }
 
@@ -282,23 +292,26 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         /// <param name="contextId"> context id</param>
         private void RemoveContext(string contextId)
         {
-            lock (_contextStack)
+            lock (_contextLock)
             {
-                string currentTopContextId = _contextStack.Peek().Id;
-                if (!contextId.Equals(_contextStack.Peek().Id, 
StringComparison.OrdinalIgnoreCase))
+                string currentTopContextId = _topContext.Id;
+                if (!contextId.Equals(_topContext.Id, 
StringComparison.OrdinalIgnoreCase))
                 {
                     var e = new 
InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying 
to close context with id '{0}' while the top context id is {1}", contextId, 
currentTopContextId));
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, 
LOGGER);
+                    Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
                 }
-                _contextStack.Peek().Dispose();
-                if (_contextStack.Count > 1)
+                var hasParentContext = _topContext.ParentContext.IsPresent();
+                _topContext.Dispose();
+                if (hasParentContext)
                 {
                     // We did not close the root context. Therefore, we need 
to inform the
                     // driver explicitly that this context is closed. The root 
context notification
                     // is implicit in the Evaluator close/done notification.
                     _heartBeatManager.OnNext(); // Ensure Driver gets notified 
of context DONE state
                 }
-                _contextStack.Pop();
+
+                // does not matter if null.
+                _topContext = _topContext.ParentContext.Value;
             }
             // System.gc(); // TODO: garbage collect?
         }
@@ -309,14 +322,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         /// <param name="startTaskProto"></param>
         private void StartTask(StartTaskProto startTaskProto)
         {
-            lock (_contextStack)
+            lock (_contextLock)
             {
-                ContextRuntime currentActiveContext = _contextStack.Peek();
+                ContextRuntime currentActiveContext = _topContext;
                 string expectedContextId = startTaskProto.context_id;
                 if (!expectedContextId.Equals(currentActiveContext.Id, 
StringComparison.OrdinalIgnoreCase))
                 {
-                    var e = new 
InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task 
expected context '{0}' but the active context has Id '{1}'", expectedContextId, 
currentActiveContext.Id));
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, 
LOGGER);
+                    var e = new InvalidOperationException(
+                        string.Format(CultureInfo.InvariantCulture, "Task 
expected context '{0}' but the active context has Id '{1}'", expectedContextId, 
currentActiveContext.Id));
+                    Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
                 }
                 TaskConfiguration taskConfiguration = new 
TaskConfiguration(startTaskProto.configuration);
                 currentActiveContext.StartTask(taskConfiguration, 
expectedContextId, _heartBeatManager);
@@ -338,7 +352,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
                 result = exception,
                 state = State.FAILED
             };
-            LOGGER.Log(Level.Error, 
string.Format(CultureInfo.InvariantCulture, "Sending Heartbeatb for a failed 
task: {0}", taskStatus.ToString()));
+            LOGGER.Log(Level.Error, "Sending Heartbeat for a failed task: 
{0}", taskStatus);
             _heartBeatManager.OnNext(taskStatus);
         }
 
@@ -346,7 +360,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE 
HeartBeatManager
         /// </summary>
         /// <param name="e"></param>
-        private void HandlContextException(ContextClientCodeException e)
+        private void HandleContextException(ContextClientCodeException e)
         {
             LOGGER.Log(Level.Error, "ContextClientCodeException", e);
             byte[] exception = ByteUtilities.StringToByteArrays(e.ToString());
@@ -360,7 +374,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
             {
                 contextStatusProto.parent_id = e.ParentId.Value;
             }
-            LOGGER.Log(Level.Error, 
string.Format(CultureInfo.InvariantCulture, "Sending Heartbeat for a failed 
context: {0}", contextStatusProto.ToString()));
+            LOGGER.Log(Level.Error, "Sending Heartbeat for a failed context: 
{0}", contextStatusProto);
             _heartBeatManager.OnNext(contextStatusProto);
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/30a1dac4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
index 1b8854c..8390df5 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
@@ -18,6 +18,7 @@
  */
 
 using System;
+using System.Collections.Generic;
 using System.Globalization;
 using Org.Apache.REEF.Common.Context;
 using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
@@ -331,9 +332,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
             }
         }
 
-        public void HandleContextMessaage(byte[] mesage)
+        [Obsolete("Deprecated in 0.14, please use HandleContextMessage 
instead.")]
+        public void HandleContextMessaage(byte[] message)
         {
-            _contextLifeCycle.HandleContextMessage(mesage);
+            _contextLifeCycle.HandleContextMessage(message);
+        }
+
+        public void HandleContextMessage(byte[] message)
+        {
+            _contextLifeCycle.HandleContextMessage(message);
         }
 
         /// <summary>
@@ -442,6 +449,16 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
                 _task.Value.HandleDriverConnectionMessage(message);
             }
         }
+
+        internal IEnumerable<ContextRuntime> GetContextStack()
+        {
+            var context = Optional<ContextRuntime>.Of(this);
+            while (context.IsPresent())
+            {
+                yield return context.Value;
+                context = context.Value.ParentContext;
+            }
+        }
     }
 }
         ////<summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/30a1dac4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
index 51f3bd2..a076e5d 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
@@ -24,42 +24,35 @@ using Org.Apache.REEF.Common.Runtime.Evaluator.Context;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities;
 using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Wake.Time;
 using Org.Apache.REEF.Wake.Time.Runtime.Event;
 
 namespace Org.Apache.REEF.Common.Runtime.Evaluator
 {
-    public class EvaluatorRuntime : IObserver<RuntimeStart>, 
IObserver<RuntimeStop>, IObserver<REEFMessage>
+    public sealed class EvaluatorRuntime : IObserver<RuntimeStart>, 
IObserver<RuntimeStop>, IObserver<REEFMessage>
     {
-        private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(EvaluatorRuntime));
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(EvaluatorRuntime));
         
         private readonly string _evaluatorId;
-
         private readonly ContextManager _contextManager;
-
         private readonly HeartBeatManager _heartBeatManager;
-
-        private readonly IRemoteManager<REEFMessage> _remoteManager;
-
         private readonly IClock _clock;
+        private readonly IDisposable _evaluatorControlChannel; 
 
         private State _state = State.INIT;
 
-        private readonly IDisposable _evaluatorControlChannel; 
-
         [Inject]
         public EvaluatorRuntime(
             ContextManager contextManager,
             HeartBeatManager heartBeatManager)
         {
-            using (LOGGER.LogFunction("EvaluatorRuntime::EvaluatorRuntime"))
+            using (Logger.LogFunction("EvaluatorRuntime::EvaluatorRuntime"))
             {
                 _clock = heartBeatManager.EvaluatorSettings.RuntimeClock;
                 _heartBeatManager = heartBeatManager;
                 _contextManager = contextManager;
                 _evaluatorId = heartBeatManager.EvaluatorSettings.EvalutorId;
-                _remoteManager = 
heartBeatManager.EvaluatorSettings.RemoteManager;
+                var remoteManager = 
heartBeatManager.EvaluatorSettings.RemoteManager;
 
                 ReefMessageProtoObserver driverObserver = new 
ReefMessageProtoObserver();
 
@@ -67,7 +60,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                 driverObserver.Subscribe(o => OnNext(o.Message));
 
                 // register the driver observer
-                _evaluatorControlChannel = 
_remoteManager.RegisterObserver(driverObserver);
+                _evaluatorControlChannel = 
remoteManager.RegisterObserver(driverObserver);
 
                 // start the hearbeat
                 _clock.ScheduleAlarm(0, heartBeatManager);
@@ -86,7 +79,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
         {
             lock (_heartBeatManager)
             {
-                LOGGER.Log(Level.Info, "Handle Evaluator control message");
+                Logger.Log(Level.Info, "Handle Evaluator control message");
                 if (!message.identifier.Equals(_evaluatorId, 
StringComparison.OrdinalIgnoreCase))
                 {
                     Handle(new InvalidOperationException(
@@ -101,13 +94,13 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                 {
                     if (message.context_control != null)
                     {
-                        LOGGER.Log(Level.Info, "Send task control message to 
ContextManager");
+                        Logger.Log(Level.Info, "Send task control message to 
ContextManager");
                         try
                         {
                             
_contextManager.HandleTaskControl(message.context_control);
                             if (_contextManager.ContextStackIsEmpty() && 
_state == State.RUNNING)
                             {
-                                LOGGER.Log(Level.Info, "Context stack is 
empty, done");
+                                Logger.Log(Level.Info, "Context stack is 
empty, done");
                                 _state = State.DONE;
                                 _heartBeatManager.OnNext(GetEvaluatorStatus());
                                 _clock.Dispose();
@@ -115,14 +108,14 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                         }
                         catch (Exception e)
                         {
-                            
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER);
+                            Utilities.Diagnostics.Exceptions.Caught(e, 
Level.Error, Logger);
                             Handle(e);
-                            
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new 
InvalidOperationException(e.ToString(), e), LOGGER);
+                            Utilities.Diagnostics.Exceptions.Throw(new 
InvalidOperationException(e.ToString(), e), Logger);
                         }
                     }
                     if (message.kill_evaluator != null)
                     {
-                        LOGGER.Log(Level.Info, 
string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by 
the driver.", _evaluatorId));
+                        Logger.Log(Level.Info, 
string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by 
the driver.", _evaluatorId));
                         _state = State.KILLED;
                         _clock.Dispose();
                     }
@@ -132,8 +125,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
 
         public EvaluatorStatusProto GetEvaluatorStatus()
         {
-            LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, 
"Evaluator state : {0}", _state));
-            EvaluatorStatusProto evaluatorStatusProto = new 
EvaluatorStatusProto()
+            Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, 
"Evaluator state : {0}", _state));
+            EvaluatorStatusProto evaluatorStatusProto = new 
EvaluatorStatusProto
             {
                 evaluator_id = _evaluatorId,
                 state = _state
@@ -147,11 +140,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
             {
                 try
                 {
-                    LOGGER.Log(Level.Info, "Runtime start");
+                    Logger.Log(Level.Info, "Runtime start");
                     if (_state != State.INIT)
                     {
                         var e = new InvalidOperationException("State should be 
init.");
-                        
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER);
+                        Utilities.Diagnostics.Exceptions.Throw(e, Logger);
                     }
                     _state = State.RUNNING;
                     _contextManager.Start();
@@ -159,45 +152,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                 }
                 catch (Exception e)
                 {
-                    Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, 
Level.Error, LOGGER);
+                    Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, 
Logger);
                     Handle(e);
                 }
             }
         }
 
-        void IObserver<RuntimeStart>.OnError(Exception error)
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<REEFMessage>.OnCompleted()
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<REEFMessage>.OnError(Exception error)
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<RuntimeStop>.OnCompleted()
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<RuntimeStop>.OnError(Exception error)
-        {
-            throw new NotImplementedException();
-        }
-
-        void IObserver<RuntimeStart>.OnCompleted()
-        {
-            throw new NotImplementedException();
-        }
-
         public void OnNext(RuntimeStop runtimeStop)
         {
-            LOGGER.Log(Level.Info, "Runtime stop");
+            Logger.Log(Level.Info, "Runtime stop");
             _contextManager.Dispose();
 
             if (_state == State.RUNNING)
@@ -211,16 +174,16 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
             }
             catch (Exception e)
             {
-                
Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new 
InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, 
"Exception during shut down.", LOGGER);
+                Utilities.Diagnostics.Exceptions.CaughtAndThrow(new 
InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, 
"Exception during shut down.", Logger);
             }
-            LOGGER.Log(Level.Info, "EvaluatorRuntime shutdown complete");      
  
+            Logger.Log(Level.Info, "EvaluatorRuntime shutdown complete");      
  
         }
 
         public void OnNext(REEFMessage value)
         {
             if (value != null && value.evaluatorControl != null)
             {
-                LOGGER.Log(Level.Info, "Received a REEFMessage with 
EvaluatorControl");
+                Logger.Log(Level.Info, "Received a REEFMessage with 
EvaluatorControl");
                 Handle(value.evaluatorControl);
             }
         }
@@ -229,7 +192,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
         {
             lock (_heartBeatManager)
             {
-                LOGGER.Log(Level.Error, 
string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with 
exception", _evaluatorId), e);
+                Logger.Log(Level.Error, 
string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with 
exception", _evaluatorId), e);
                 _state = State.FAILED;
                 string errorMessage = string.Format(
                         CultureInfo.InvariantCulture,
@@ -247,5 +210,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                 _contextManager.Dispose();
             }       
         }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/30a1dac4/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs 
b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs
index 0ffce12..f504e77 100644
--- a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs
+++ b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs
@@ -21,12 +21,9 @@ using System;
 using System.Collections.Generic;
 using System.Configuration;
 using System.Diagnostics;
-using System.Globalization;
 using System.IO;
 using System.Linq;
-using System.Text;
 using System.Threading;
-using System.Threading.Tasks;
 using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
 using Org.Apache.REEF.Common.Runtime.Evaluator;
 using Org.Apache.REEF.Common.Runtime.Evaluator.Context;
@@ -45,41 +42,34 @@ using Org.Apache.REEF.Wake.Remote;
 using Org.Apache.REEF.Wake.Remote.Impl;
 using Org.Apache.REEF.Wake.Time.Runtime;
 using Org.Apache.REEF.Wake.Time.Runtime.Event;
-using Org.Apache.REEF.Wake.Util;
 
 namespace Org.Apache.REEF.Evaluator
 {
     public sealed class Evaluator
     {
-        private static Logger _logger = Logger.GetLogger(typeof(Evaluator));
-
-        private static int _heartbeatPeriodInMs = 
Constants.DefaultEvaluatorHeartbeatPeriodInMs;
-
-        private static int _heartbeatMaxRetry = 
Constants.DefaultEvaluatorHeartbeatMaxRetry;
-
-        private static IInjector _injector;
-
-        private static EvaluatorConfigurations _evaluatorConfig;
+        private static Logger logger = Logger.GetLogger(typeof(Evaluator));
+        private static int heartbeatPeriodInMs = 
Constants.DefaultEvaluatorHeartbeatPeriodInMs;
+        private static int heartbeatMaxRetry = 
Constants.DefaultEvaluatorHeartbeatMaxRetry;
+        private static IInjector injector;
+        private static EvaluatorConfigurations evaluatorConfig;
 
         public static void Main(string[] args)
         {
-            
             try
             {
-                Console.WriteLine(string.Format(CultureInfo.InvariantCulture, 
"START: {0} Evaluator::InitInjector.",
-                    DateTime.Now));
+                Console.WriteLine("START: {0} Evaluator::InitInjector.", 
DateTime.Now);
                 Stopwatch timer = new Stopwatch();
                 InitInjector();
-                SetCustomTraceListeners();  // _logger is reset by this.
+                SetCustomTraceListeners();  // logger is reset by this.
                 timer.Stop();
-                Console.WriteLine(string.Format(CultureInfo.InvariantCulture,
-                    "EXIT: {0} Evaluator::InitInjector. Duration: [{1}].", 
DateTime.Now, timer.Elapsed));
-
+                Console.WriteLine("EXIT: {0} Evaluator::InitInjector. 
Duration: [{1}].", DateTime.Now, timer.Elapsed);
                 
-                using (_logger.LogScope("Evaluator::Main"))
+                using (logger.LogScope("Evaluator::Main"))
                 {
-                    // Wait for the debugger, if enabled
-                    AttachDebuggerIfEnabled();
+                    if (IsDebuggingEnabled())
+                    {
+                        AttachDebugger();
+                    }
 
                     // Register our exception handler
                     AppDomain.CurrentDomain.UnhandledException += 
UnhandledExceptionHandler;
@@ -93,40 +83,38 @@ namespace Org.Apache.REEF.Evaluator
                     if (args.Count() != 1)
                     {
                         var e = new InvalidOperationException("must supply 
only the evaluator config file!");
-                        Utilities.Diagnostics.Exceptions.Throw(e, _logger);
+                        Utilities.Diagnostics.Exceptions.Throw(e, logger);
                     }
 
                     // evaluator configuration file
                     string evaluatorConfigurationPath = args[0];
 
                     // Parse the evaluator configuration.
-                    _evaluatorConfig = new 
EvaluatorConfigurations(evaluatorConfigurationPath);
+                    evaluatorConfig = new 
EvaluatorConfigurations(evaluatorConfigurationPath);
 
-                    string rId = _evaluatorConfig.ErrorHandlerRID;
-                    ContextConfiguration rootContextConfiguration = 
_evaluatorConfig.RootContextConfiguration;
-                    Optional<TaskConfiguration> rootTaskConfig = 
_evaluatorConfig.TaskConfiguration;
-                    Optional<ServiceConfiguration> rootServiceConfig = 
_evaluatorConfig.RootServiceConfiguration;
+                    string rId = evaluatorConfig.ErrorHandlerRID;
+                    ContextConfiguration rootContextConfiguration = 
evaluatorConfig.RootContextConfiguration;
+                    Optional<TaskConfiguration> rootTaskConfig = 
evaluatorConfig.TaskConfiguration;
+                    Optional<ServiceConfiguration> rootServiceConfig = 
evaluatorConfig.RootServiceConfiguration;
 
                     // remoteManager used as client-only in evaluator
-                    IRemoteManager<REEFMessage> remoteManager = 
_injector.GetInstance<IRemoteManagerFactory>().GetInstance(new 
REEFMessageCodec());
+                    IRemoteManager<REEFMessage> remoteManager = 
injector.GetInstance<IRemoteManagerFactory>().GetInstance(new 
REEFMessageCodec());
                     IRemoteIdentifier remoteId = new 
SocketRemoteIdentifier(NetUtilities.ParseIpEndpoint(rId));
 
-
                     RuntimeClock clock = InstantiateClock();
-                    _logger.Log(Level.Info, "Application Id: " + 
_evaluatorConfig.ApplicationId);
+                    logger.Log(Level.Info, "Application Id: " + 
evaluatorConfig.ApplicationId);
                     EvaluatorSettings evaluatorSettings = new 
EvaluatorSettings(
-                        _evaluatorConfig.ApplicationId,
-                        _evaluatorConfig.EvaluatorId,
-                        _heartbeatPeriodInMs,
-                        _heartbeatMaxRetry,
+                        evaluatorConfig.ApplicationId,
+                        evaluatorConfig.EvaluatorId,
+                        heartbeatPeriodInMs,
+                        heartbeatMaxRetry,
                         rootContextConfiguration,
                         clock,
                         remoteManager,
-                        _injector);
+                        injector);
 
                     HeartBeatManager heartBeatManager = new 
HeartBeatManager(evaluatorSettings, remoteId);
-                    ContextManager contextManager = new 
ContextManager(heartBeatManager, rootServiceConfig,
-                        rootTaskConfig);
+                    ContextManager contextManager = new 
ContextManager(heartBeatManager, rootServiceConfig, rootTaskConfig);
                     EvaluatorRuntime evaluatorRuntime = new 
EvaluatorRuntime(contextManager, heartBeatManager);
 
                     // TODO: replace with injectionFuture
@@ -135,9 +123,7 @@ namespace Org.Apache.REEF.Evaluator
 
                     SetRuntimeHandlers(evaluatorRuntime, clock);
 
-
-                    Task evaluatorTask = Task.Run(new Action(clock.Run));
-                    evaluatorTask.Wait();
+                    clock.Run();
                 }
             }
             catch (Exception e)
@@ -150,7 +136,7 @@ namespace Org.Apache.REEF.Evaluator
         /// Determines whether debugging is enabled.
         /// </summary>
         /// <returns>true, if debugging is enabled</returns>
-        private static Boolean IsDebuggingEnabled()
+        private static bool IsDebuggingEnabled()
         {
             var debugEnabledString = 
Environment.GetEnvironmentVariable("Org.Apache.REEF.EvaluatorDebug");
             return !string.IsNullOrWhiteSpace(debugEnabledString) &&
@@ -160,23 +146,18 @@ namespace Org.Apache.REEF.Evaluator
         /// <summary>
         /// Waits for the debugger to be attached.
         /// </summary>
-        private static void AttachDebuggerIfEnabled()
+        private static void AttachDebugger()
         {
-            if (IsDebuggingEnabled())
+            // Wait for the debugger
+            while (true)
             {
-                while (true)
+                if (Debugger.IsAttached)
                 {
-                    if (Debugger.IsAttached)
-                    {
-                        break;
-                    }
-                    else
-                    {
-                        _logger.Log(Level.Info,
-                            "Evaluator in debug mode, waiting for debugger to 
be attached...");
-                        Thread.Sleep(2000);
-                    }
+                    break;
                 }
+
+                logger.Log(Level.Info, "Evaluator in debug mode, waiting for 
debugger to be attached...");
+                Thread.Sleep(2000);
             }
         }
 
@@ -192,10 +173,10 @@ namespace Org.Apache.REEF.Evaluator
             if (!string.IsNullOrWhiteSpace(heartbeatPeriodFromConfig) &&
                 int.TryParse(heartbeatPeriodFromConfig, out heartbeatPeriod))
             {
-                _heartbeatPeriodInMs = heartbeatPeriod;
+                heartbeatPeriodInMs = heartbeatPeriod;
             }
-            _logger.Log(Level.Verbose,
-                "Evaluator heartbeat period set to be " + _heartbeatPeriodInMs 
+ " milliSeconds.");
+            logger.Log(Level.Verbose,
+                "Evaluator heartbeat period set to be " + heartbeatPeriodInMs 
+ " milliSeconds.");
         }
 
         /// <summary>
@@ -210,10 +191,10 @@ namespace Org.Apache.REEF.Evaluator
             if (!string.IsNullOrWhiteSpace(heartbeatMaxRetryFromConfig) &&
                 int.TryParse(heartbeatMaxRetryFromConfig, out 
maxHeartbeatRetry))
             {
-                _heartbeatMaxRetry = maxHeartbeatRetry;
+                heartbeatMaxRetry = maxHeartbeatRetry;
             }
-            _logger.Log(Level.Verbose,
-                "Evaluator heartbeat max retry set to be " + 
_heartbeatMaxRetry + " times.");
+            logger.Log(Level.Verbose,
+                "Evaluator heartbeat max retry set to be " + heartbeatMaxRetry 
+ " times.");
         }
 
 
@@ -225,7 +206,7 @@ namespace Org.Apache.REEF.Evaluator
         {
             try
             {
-                _injector = 
TangFactory.GetTang().NewInjector(ReadEvaluatorConfiguration());
+                injector = 
TangFactory.GetTang().NewInjector(ReadEvaluatorConfiguration());
             }
             catch (Exception e)
             {
@@ -281,11 +262,11 @@ namespace Org.Apache.REEF.Evaluator
             ISet<TraceListener> customTraceListeners;
             try
             {
-                customTraceListeners = 
_injector.GetInstance<CustomTraceListeners>().Listeners;
+                customTraceListeners = 
injector.GetInstance<CustomTraceListeners>().Listeners;
             }
             catch (Exception e)
             {
-                Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, 
_logger);
+                Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, 
logger);
                 // custom trace listener not set properly, use empty set
                 customTraceListeners = new HashSet<TraceListener>();
             }
@@ -293,8 +274,8 @@ namespace Org.Apache.REEF.Evaluator
             {
                 Logger.AddTraceListener(listener);
             }
-            _logger = Logger.GetLogger(typeof(Evaluator));
-            CustomTraceLevel traceLevel = 
_injector.GetInstance<CustomTraceLevel>();
+            logger = Logger.GetLogger(typeof(Evaluator));
+            CustomTraceLevel traceLevel = 
injector.GetInstance<CustomTraceLevel>();
             Logger.SetCustomLevel(traceLevel.TraceLevel);
         }
 
@@ -303,43 +284,22 @@ namespace Org.Apache.REEF.Evaluator
             Fail((Exception)e.ExceptionObject);
         }
 
-        private static string GetDirectoryListing(string path, StringBuilder 
resultBuilder = null)
-        {
-            if (null == resultBuilder)
-            {
-                resultBuilder = new StringBuilder();
-            }
-
-            // First, add the files to the listing
-            var files = Directory.GetFiles(path).Select(e => 
Path.Combine(path, e));
-            resultBuilder.Append(string.Join(", ", files));
-            // Second, add the directories recursively
-            var dirs = Directory.GetDirectories(path).Select(e => 
Path.Combine(path, e));
-            foreach (var dir in dirs)
-            {
-                GetDirectoryListing(dir, resultBuilder);
-            }
-            return resultBuilder.ToString();
-        }
-
         // set the handlers for runtimeclock manually
         // we only need runtimestart and runtimestop handlers now
         private static void SetRuntimeHandlers(EvaluatorRuntime 
evaluatorRuntime, RuntimeClock clock)
         {
             ISet<IObserver<RuntimeStart>> runtimeStarts = new 
HashSet<IObserver<RuntimeStart>> { evaluatorRuntime };
-            InjectionFutureImpl<ISet<IObserver<RuntimeStart>>> 
injectRuntimeStart = new 
InjectionFutureImpl<ISet<IObserver<RuntimeStart>>>(runtimeStarts);
-            clock.InjectedRuntimeStartHandler = injectRuntimeStart;
+            clock.InjectedRuntimeStartHandler = new 
InjectionFutureImpl<ISet<IObserver<RuntimeStart>>>(runtimeStarts);
 
             ISet<IObserver<RuntimeStop>> runtimeStops = new 
HashSet<IObserver<RuntimeStop>> { evaluatorRuntime };
-            InjectionFutureImpl<ISet<IObserver<RuntimeStop>>> 
injectRuntimeStop = new 
InjectionFutureImpl<ISet<IObserver<RuntimeStop>>>(runtimeStops);
-            clock.InjectedRuntimeStopHandler = injectRuntimeStop;
+            clock.InjectedRuntimeStopHandler = new 
InjectionFutureImpl<ISet<IObserver<RuntimeStop>>>(runtimeStops);
         }
 
         private static void Fail(Exception ex)
         {
             var message = "Unhandled exception caught in Evaluator. Current 
files in the working directory: " +
-                          GetDirectoryListing(Directory.GetCurrentDirectory());
-            Utilities.Diagnostics.Exceptions.Throw(ex, message, _logger);
+                          string.Join(", ", 
Directory.EnumerateFiles(Directory.GetCurrentDirectory(), "*.*", 
SearchOption.AllDirectories));
+            Utilities.Diagnostics.Exceptions.Throw(ex, message, logger);
         }
     }
 }
\ No newline at end of file

Reply via email to