Repository: reef Updated Branches: refs/heads/master 372223469 -> 30a1dac46
[REEF-1039] Clean up evaluator runtime and context management This addressed the issue by * Cleaned up incorrect usage of Task.Start. * Removed usage of Stack in ContextManager. * Cleaned up style issues. JIRA: [REEF-1039](https://issues.apache.org/jira/browse/REEF-1039) Pull request: This closes #701 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/30a1dac4 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/30a1dac4 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/30a1dac4 Branch: refs/heads/master Commit: 30a1dac46187d11c6567e951de9b7af4ea73c084 Parents: 3722234 Author: Andrew Chung <[email protected]> Authored: Thu Dec 3 15:08:23 2015 -0800 Committer: Dongjoon Hyun <[email protected]> Committed: Sat Dec 5 22:25:28 2015 +0900 ---------------------------------------------------------------------- .../Runtime/Evaluator/Context/ContextManager.cs | 150 ++++++++++--------- .../Runtime/Evaluator/Context/ContextRuntime.cs | 21 ++- .../Runtime/Evaluator/EvaluatorRuntime.cs | 91 ++++------- lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs | 144 +++++++----------- 4 files changed, 185 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/30a1dac4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs index ac27723..b054d30 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs @@ -31,15 +31,13 @@ using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context { - public class ContextManager : IDisposable + public sealed class ContextManager : IDisposable { private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextManager)); - - private readonly Stack<ContextRuntime> _contextStack = new Stack<ContextRuntime>(); - private readonly HeartBeatManager _heartBeatManager; - private readonly RootContextLauncher _rootContextLauncher; + private readonly object _contextLock = new object(); + private ContextRuntime _topContext = null; public ContextManager(HeartBeatManager heartBeatManager, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig) { @@ -55,22 +53,21 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// </summary> public void Start() { - lock (_contextStack) + lock (_contextLock) { - ContextRuntime rootContext = _rootContextLauncher.GetRootContext(); - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id {0}", rootContext.Id)); - _contextStack.Push(rootContext); + _topContext = _rootContextLauncher.GetRootContext(); + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id {0}", _topContext.Id)); if (_rootContextLauncher.RootTaskConfig.IsPresent()) { LOGGER.Log(Level.Info, "Launching the initial Task"); try { - _contextStack.Peek().StartTask(_rootContextLauncher.RootTaskConfig.Value, _rootContextLauncher.RootContextConfig.Id, _heartBeatManager); + _topContext.StartTask(_rootContextLauncher.RootTaskConfig.Value, _rootContextLauncher.RootContextConfig.Id, _heartBeatManager); } catch (TaskClientCodeException e) { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Exception when trying to start a task.", LOGGER); + Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Exception when trying to start a task.", LOGGER); HandleTaskException(e); } } @@ -79,9 +76,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context public bool ContextStackIsEmpty() { - lock (_contextStack) + lock (_contextLock) { - return _contextStack.Count == 0; + return _topContext == null; } } @@ -99,7 +96,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context byte[] message = controlMessage.task_message; if (controlMessage.add_context != null && controlMessage.remove_context != null) { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Received a message with both add and remove context. This is unsupported."), LOGGER); + Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Received a message with both add and remove context. This is unsupported."), LOGGER); } if (controlMessage.add_context != null) { @@ -132,56 +129,67 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context else if (controlMessage.stop_task != null) { LOGGER.Log(Level.Info, "CloseTask"); - _contextStack.Peek().CloseTask(message); + lock (_contextLock) + { + _topContext.CloseTask(message); + } } else if (controlMessage.suspend_task != null) { LOGGER.Log(Level.Info, "SuspendTask"); - _contextStack.Peek().SuspendTask(message); + lock (_contextLock) + { + _topContext.SuspendTask(message); + } } else if (controlMessage.task_message != null) { LOGGER.Log(Level.Info, "DeliverTaskMessage"); - _contextStack.Peek().DeliverTaskMessage(message); + lock (_contextLock) + { + _topContext.DeliverTaskMessage(message); + } } else if (controlMessage.context_message != null) { LOGGER.Log(Level.Info, "Handle context contol message"); ContextMessageProto contextMessageProto = controlMessage.context_message; - bool deliveredMessage = false; - foreach (ContextRuntime context in _contextStack) + ContextRuntime context = null; + lock (_contextLock) { - if (context.Id.Equals(contextMessageProto.context_id)) + if (_topContext != null) { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Handle context message {0}", controlMessage.context_message.message)); - context.HandleContextMessaage(controlMessage.context_message.message); - deliveredMessage = true; - break; + context = _topContext.GetContextStack().FirstOrDefault(ctx => ctx.Id.Equals(contextMessageProto.context_id)); } } - if (!deliveredMessage) + + if (context != null) + { + context.HandleContextMessage(controlMessage.context_message.message); + } + else { - InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent message to unknown context {0}", contextMessageProto.context_id)); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent message to unknown context {0}", contextMessageProto.context_id)); + Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); } } else { InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown task control message: {0}", controlMessage.ToString())); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); } } catch (Exception e) { if (e is TaskClientCodeException) { - HandleTaskException((TaskClientCodeException)e); + HandleTaskException(e as TaskClientCodeException); } else if (e is ContextClientCodeException) { - HandlContextException((ContextClientCodeException)e); + HandleContextException(e as ContextClientCodeException); } - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, LOGGER); + Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, LOGGER); } } @@ -191,13 +199,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// <returns>the TaskStatusProto of the currently running task, if there is any</returns> public Optional<TaskStatusProto> GetTaskStatus() { - if (_contextStack.Count == 0) + lock (_contextLock) { - return Optional<TaskStatusProto>.Empty(); - - // throw new InvalidOperationException("Asked for an Task status while there isn't even a context running."); + return _topContext == null ? Optional<TaskStatusProto>.Empty() : _topContext.GetTaskStatus(); } - return _contextStack.Peek().GetTaskStatus(); } /// <summary> @@ -207,11 +212,16 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context public ICollection<ContextStatusProto> GetContextStatusCollection() { ICollection<ContextStatusProto> result = new Collection<ContextStatusProto>(); - foreach (ContextRuntime runtime in _contextStack) + lock (_contextLock) { - ContextStatusProto contextStatusProto = runtime.GetContextStatus(); - LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", contextStatusProto)); - result.Add(contextStatusProto); + if (_topContext != null) + { + foreach (var contextStatusProto in _topContext.GetContextStack().Select(context => context.GetContextStatus())) + { + LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", contextStatusProto)); + result.Add(contextStatusProto); + } + } } return result; } @@ -222,16 +232,13 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// </summary> public void Dispose() { - lock (_contextStack) + lock (_contextLock) { - if (_contextStack != null && _contextStack.Any()) + if (_topContext != null) { LOGGER.Log(Level.Info, "context stack not empty, forcefully closing context runtime."); - ContextRuntime runtime = _contextStack.Last(); - if (runtime != null) - { - runtime.Dispose(); - } + _topContext.Dispose(); + _topContext = null; } } } @@ -241,7 +248,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// </summary> internal void HandleDriverConnectionMessage(IDriverConnectionMessage message) { - _contextStack.Peek().HandleDriverConnectionMessage(message); + lock (_contextLock) + { + _topContext.HandleDriverConnectionMessage(message); + } } /// <summary> @@ -250,29 +260,29 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// <param name="addContextProto"></param> private void AddContext(AddContextProto addContextProto) { - lock (_contextStack) + lock (_contextLock) { - ContextRuntime currentTopContext = _contextStack.Peek(); + var currentTopContext = _topContext; if (!currentTopContext.Id.Equals(addContextProto.parent_context_id, StringComparison.OrdinalIgnoreCase)) { var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to instantiate a child context on context with id '{0}' while the current top context id is {1}", addContextProto.parent_context_id, currentTopContext.Id)); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); } string contextConfigString = addContextProto.context_configuration; - ContextConfiguration contextConfiguration = new ContextConfiguration(contextConfigString); + var contextConfiguration = new ContextConfiguration(contextConfigString); ContextRuntime newTopContext; if (addContextProto.service_configuration != null) { - ServiceConfiguration serviceConfiguration = new ServiceConfiguration(addContextProto.service_configuration); + var serviceConfiguration = new ServiceConfiguration(addContextProto.service_configuration); newTopContext = currentTopContext.SpawnChildContext(contextConfiguration, serviceConfiguration.TangConfig); } else { newTopContext = currentTopContext.SpawnChildContext(contextConfiguration); } - _contextStack.Push(newTopContext); + _topContext = newTopContext; } } @@ -282,23 +292,26 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// <param name="contextId"> context id</param> private void RemoveContext(string contextId) { - lock (_contextStack) + lock (_contextLock) { - string currentTopContextId = _contextStack.Peek().Id; - if (!contextId.Equals(_contextStack.Peek().Id, StringComparison.OrdinalIgnoreCase)) + string currentTopContextId = _topContext.Id; + if (!contextId.Equals(_topContext.Id, StringComparison.OrdinalIgnoreCase)) { var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to close context with id '{0}' while the top context id is {1}", contextId, currentTopContextId)); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); } - _contextStack.Peek().Dispose(); - if (_contextStack.Count > 1) + var hasParentContext = _topContext.ParentContext.IsPresent(); + _topContext.Dispose(); + if (hasParentContext) { // We did not close the root context. Therefore, we need to inform the // driver explicitly that this context is closed. The root context notification // is implicit in the Evaluator close/done notification. _heartBeatManager.OnNext(); // Ensure Driver gets notified of context DONE state } - _contextStack.Pop(); + + // does not matter if null. + _topContext = _topContext.ParentContext.Value; } // System.gc(); // TODO: garbage collect? } @@ -309,14 +322,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// <param name="startTaskProto"></param> private void StartTask(StartTaskProto startTaskProto) { - lock (_contextStack) + lock (_contextLock) { - ContextRuntime currentActiveContext = _contextStack.Peek(); + ContextRuntime currentActiveContext = _topContext; string expectedContextId = startTaskProto.context_id; if (!expectedContextId.Equals(currentActiveContext.Id, StringComparison.OrdinalIgnoreCase)) { - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task expected context '{0}' but the active context has Id '{1}'", expectedContextId, currentActiveContext.Id)); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + var e = new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Task expected context '{0}' but the active context has Id '{1}'", expectedContextId, currentActiveContext.Id)); + Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); } TaskConfiguration taskConfiguration = new TaskConfiguration(startTaskProto.configuration); currentActiveContext.StartTask(taskConfiguration, expectedContextId, _heartBeatManager); @@ -338,7 +352,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context result = exception, state = State.FAILED }; - LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeatb for a failed task: {0}", taskStatus.ToString())); + LOGGER.Log(Level.Error, "Sending Heartbeat for a failed task: {0}", taskStatus); _heartBeatManager.OnNext(taskStatus); } @@ -346,7 +360,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager /// </summary> /// <param name="e"></param> - private void HandlContextException(ContextClientCodeException e) + private void HandleContextException(ContextClientCodeException e) { LOGGER.Log(Level.Error, "ContextClientCodeException", e); byte[] exception = ByteUtilities.StringToByteArrays(e.ToString()); @@ -360,7 +374,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context { contextStatusProto.parent_id = e.ParentId.Value; } - LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeat for a failed context: {0}", contextStatusProto.ToString())); + LOGGER.Log(Level.Error, "Sending Heartbeat for a failed context: {0}", contextStatusProto); _heartBeatManager.OnNext(contextStatusProto); } } http://git-wip-us.apache.org/repos/asf/reef/blob/30a1dac4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs index 1b8854c..8390df5 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs @@ -18,6 +18,7 @@ */ using System; +using System.Collections.Generic; using System.Globalization; using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; @@ -331,9 +332,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context } } - public void HandleContextMessaage(byte[] mesage) + [Obsolete("Deprecated in 0.14, please use HandleContextMessage instead.")] + public void HandleContextMessaage(byte[] message) { - _contextLifeCycle.HandleContextMessage(mesage); + _contextLifeCycle.HandleContextMessage(message); + } + + public void HandleContextMessage(byte[] message) + { + _contextLifeCycle.HandleContextMessage(message); } /// <summary> @@ -442,6 +449,16 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context _task.Value.HandleDriverConnectionMessage(message); } } + + internal IEnumerable<ContextRuntime> GetContextStack() + { + var context = Optional<ContextRuntime>.Of(this); + while (context.IsPresent()) + { + yield return context.Value; + context = context.Value.ParentContext; + } + } } } ////<summary> http://git-wip-us.apache.org/repos/asf/reef/blob/30a1dac4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs index 51f3bd2..a076e5d 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs @@ -24,42 +24,35 @@ using Org.Apache.REEF.Common.Runtime.Evaluator.Context; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Time; using Org.Apache.REEF.Wake.Time.Runtime.Event; namespace Org.Apache.REEF.Common.Runtime.Evaluator { - public class EvaluatorRuntime : IObserver<RuntimeStart>, IObserver<RuntimeStop>, IObserver<REEFMessage> + public sealed class EvaluatorRuntime : IObserver<RuntimeStart>, IObserver<RuntimeStop>, IObserver<REEFMessage> { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorRuntime)); + private static readonly Logger Logger = Logger.GetLogger(typeof(EvaluatorRuntime)); private readonly string _evaluatorId; - private readonly ContextManager _contextManager; - private readonly HeartBeatManager _heartBeatManager; - - private readonly IRemoteManager<REEFMessage> _remoteManager; - private readonly IClock _clock; + private readonly IDisposable _evaluatorControlChannel; private State _state = State.INIT; - private readonly IDisposable _evaluatorControlChannel; - [Inject] public EvaluatorRuntime( ContextManager contextManager, HeartBeatManager heartBeatManager) { - using (LOGGER.LogFunction("EvaluatorRuntime::EvaluatorRuntime")) + using (Logger.LogFunction("EvaluatorRuntime::EvaluatorRuntime")) { _clock = heartBeatManager.EvaluatorSettings.RuntimeClock; _heartBeatManager = heartBeatManager; _contextManager = contextManager; _evaluatorId = heartBeatManager.EvaluatorSettings.EvalutorId; - _remoteManager = heartBeatManager.EvaluatorSettings.RemoteManager; + var remoteManager = heartBeatManager.EvaluatorSettings.RemoteManager; ReefMessageProtoObserver driverObserver = new ReefMessageProtoObserver(); @@ -67,7 +60,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator driverObserver.Subscribe(o => OnNext(o.Message)); // register the driver observer - _evaluatorControlChannel = _remoteManager.RegisterObserver(driverObserver); + _evaluatorControlChannel = remoteManager.RegisterObserver(driverObserver); // start the hearbeat _clock.ScheduleAlarm(0, heartBeatManager); @@ -86,7 +79,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator { lock (_heartBeatManager) { - LOGGER.Log(Level.Info, "Handle Evaluator control message"); + Logger.Log(Level.Info, "Handle Evaluator control message"); if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase)) { Handle(new InvalidOperationException( @@ -101,13 +94,13 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator { if (message.context_control != null) { - LOGGER.Log(Level.Info, "Send task control message to ContextManager"); + Logger.Log(Level.Info, "Send task control message to ContextManager"); try { _contextManager.HandleTaskControl(message.context_control); if (_contextManager.ContextStackIsEmpty() && _state == State.RUNNING) { - LOGGER.Log(Level.Info, "Context stack is empty, done"); + Logger.Log(Level.Info, "Context stack is empty, done"); _state = State.DONE; _heartBeatManager.OnNext(GetEvaluatorStatus()); _clock.Dispose(); @@ -115,14 +108,14 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } catch (Exception e) { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, Logger); Handle(e); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(e.ToString(), e), LOGGER); + Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(e.ToString(), e), Logger); } } if (message.kill_evaluator != null) { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by the driver.", _evaluatorId)); + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by the driver.", _evaluatorId)); _state = State.KILLED; _clock.Dispose(); } @@ -132,8 +125,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator public EvaluatorStatusProto GetEvaluatorStatus() { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state : {0}", _state)); - EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto() + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state : {0}", _state)); + EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto { evaluator_id = _evaluatorId, state = _state @@ -147,11 +140,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator { try { - LOGGER.Log(Level.Info, "Runtime start"); + Logger.Log(Level.Info, "Runtime start"); if (_state != State.INIT) { var e = new InvalidOperationException("State should be init."); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + Utilities.Diagnostics.Exceptions.Throw(e, Logger); } _state = State.RUNNING; _contextManager.Start(); @@ -159,45 +152,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } catch (Exception e) { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, Logger); Handle(e); } } } - void IObserver<RuntimeStart>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<REEFMessage>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<REEFMessage>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStop>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStop>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStart>.OnCompleted() - { - throw new NotImplementedException(); - } - public void OnNext(RuntimeStop runtimeStop) { - LOGGER.Log(Level.Info, "Runtime stop"); + Logger.Log(Level.Info, "Runtime stop"); _contextManager.Dispose(); if (_state == State.RUNNING) @@ -211,16 +174,16 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } catch (Exception e) { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, "Exception during shut down.", LOGGER); + Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, "Exception during shut down.", Logger); } - LOGGER.Log(Level.Info, "EvaluatorRuntime shutdown complete"); + Logger.Log(Level.Info, "EvaluatorRuntime shutdown complete"); } public void OnNext(REEFMessage value) { if (value != null && value.evaluatorControl != null) { - LOGGER.Log(Level.Info, "Received a REEFMessage with EvaluatorControl"); + Logger.Log(Level.Info, "Received a REEFMessage with EvaluatorControl"); Handle(value.evaluatorControl); } } @@ -229,7 +192,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator { lock (_heartBeatManager) { - LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with exception", _evaluatorId), e); + Logger.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with exception", _evaluatorId), e); _state = State.FAILED; string errorMessage = string.Format( CultureInfo.InvariantCulture, @@ -247,5 +210,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator _contextManager.Dispose(); } } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/30a1dac4/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs index 0ffce12..f504e77 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs @@ -21,12 +21,9 @@ using System; using System.Collections.Generic; using System.Configuration; using System.Diagnostics; -using System.Globalization; using System.IO; using System.Linq; -using System.Text; using System.Threading; -using System.Threading.Tasks; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Runtime.Evaluator; using Org.Apache.REEF.Common.Runtime.Evaluator.Context; @@ -45,41 +42,34 @@ using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; using Org.Apache.REEF.Wake.Time.Runtime; using Org.Apache.REEF.Wake.Time.Runtime.Event; -using Org.Apache.REEF.Wake.Util; namespace Org.Apache.REEF.Evaluator { public sealed class Evaluator { - private static Logger _logger = Logger.GetLogger(typeof(Evaluator)); - - private static int _heartbeatPeriodInMs = Constants.DefaultEvaluatorHeartbeatPeriodInMs; - - private static int _heartbeatMaxRetry = Constants.DefaultEvaluatorHeartbeatMaxRetry; - - private static IInjector _injector; - - private static EvaluatorConfigurations _evaluatorConfig; + private static Logger logger = Logger.GetLogger(typeof(Evaluator)); + private static int heartbeatPeriodInMs = Constants.DefaultEvaluatorHeartbeatPeriodInMs; + private static int heartbeatMaxRetry = Constants.DefaultEvaluatorHeartbeatMaxRetry; + private static IInjector injector; + private static EvaluatorConfigurations evaluatorConfig; public static void Main(string[] args) { - try { - Console.WriteLine(string.Format(CultureInfo.InvariantCulture, "START: {0} Evaluator::InitInjector.", - DateTime.Now)); + Console.WriteLine("START: {0} Evaluator::InitInjector.", DateTime.Now); Stopwatch timer = new Stopwatch(); InitInjector(); - SetCustomTraceListeners(); // _logger is reset by this. + SetCustomTraceListeners(); // logger is reset by this. timer.Stop(); - Console.WriteLine(string.Format(CultureInfo.InvariantCulture, - "EXIT: {0} Evaluator::InitInjector. Duration: [{1}].", DateTime.Now, timer.Elapsed)); - + Console.WriteLine("EXIT: {0} Evaluator::InitInjector. Duration: [{1}].", DateTime.Now, timer.Elapsed); - using (_logger.LogScope("Evaluator::Main")) + using (logger.LogScope("Evaluator::Main")) { - // Wait for the debugger, if enabled - AttachDebuggerIfEnabled(); + if (IsDebuggingEnabled()) + { + AttachDebugger(); + } // Register our exception handler AppDomain.CurrentDomain.UnhandledException += UnhandledExceptionHandler; @@ -93,40 +83,38 @@ namespace Org.Apache.REEF.Evaluator if (args.Count() != 1) { var e = new InvalidOperationException("must supply only the evaluator config file!"); - Utilities.Diagnostics.Exceptions.Throw(e, _logger); + Utilities.Diagnostics.Exceptions.Throw(e, logger); } // evaluator configuration file string evaluatorConfigurationPath = args[0]; // Parse the evaluator configuration. - _evaluatorConfig = new EvaluatorConfigurations(evaluatorConfigurationPath); + evaluatorConfig = new EvaluatorConfigurations(evaluatorConfigurationPath); - string rId = _evaluatorConfig.ErrorHandlerRID; - ContextConfiguration rootContextConfiguration = _evaluatorConfig.RootContextConfiguration; - Optional<TaskConfiguration> rootTaskConfig = _evaluatorConfig.TaskConfiguration; - Optional<ServiceConfiguration> rootServiceConfig = _evaluatorConfig.RootServiceConfiguration; + string rId = evaluatorConfig.ErrorHandlerRID; + ContextConfiguration rootContextConfiguration = evaluatorConfig.RootContextConfiguration; + Optional<TaskConfiguration> rootTaskConfig = evaluatorConfig.TaskConfiguration; + Optional<ServiceConfiguration> rootServiceConfig = evaluatorConfig.RootServiceConfiguration; // remoteManager used as client-only in evaluator - IRemoteManager<REEFMessage> remoteManager = _injector.GetInstance<IRemoteManagerFactory>().GetInstance(new REEFMessageCodec()); + IRemoteManager<REEFMessage> remoteManager = injector.GetInstance<IRemoteManagerFactory>().GetInstance(new REEFMessageCodec()); IRemoteIdentifier remoteId = new SocketRemoteIdentifier(NetUtilities.ParseIpEndpoint(rId)); - RuntimeClock clock = InstantiateClock(); - _logger.Log(Level.Info, "Application Id: " + _evaluatorConfig.ApplicationId); + logger.Log(Level.Info, "Application Id: " + evaluatorConfig.ApplicationId); EvaluatorSettings evaluatorSettings = new EvaluatorSettings( - _evaluatorConfig.ApplicationId, - _evaluatorConfig.EvaluatorId, - _heartbeatPeriodInMs, - _heartbeatMaxRetry, + evaluatorConfig.ApplicationId, + evaluatorConfig.EvaluatorId, + heartbeatPeriodInMs, + heartbeatMaxRetry, rootContextConfiguration, clock, remoteManager, - _injector); + injector); HeartBeatManager heartBeatManager = new HeartBeatManager(evaluatorSettings, remoteId); - ContextManager contextManager = new ContextManager(heartBeatManager, rootServiceConfig, - rootTaskConfig); + ContextManager contextManager = new ContextManager(heartBeatManager, rootServiceConfig, rootTaskConfig); EvaluatorRuntime evaluatorRuntime = new EvaluatorRuntime(contextManager, heartBeatManager); // TODO: replace with injectionFuture @@ -135,9 +123,7 @@ namespace Org.Apache.REEF.Evaluator SetRuntimeHandlers(evaluatorRuntime, clock); - - Task evaluatorTask = Task.Run(new Action(clock.Run)); - evaluatorTask.Wait(); + clock.Run(); } } catch (Exception e) @@ -150,7 +136,7 @@ namespace Org.Apache.REEF.Evaluator /// Determines whether debugging is enabled. /// </summary> /// <returns>true, if debugging is enabled</returns> - private static Boolean IsDebuggingEnabled() + private static bool IsDebuggingEnabled() { var debugEnabledString = Environment.GetEnvironmentVariable("Org.Apache.REEF.EvaluatorDebug"); return !string.IsNullOrWhiteSpace(debugEnabledString) && @@ -160,23 +146,18 @@ namespace Org.Apache.REEF.Evaluator /// <summary> /// Waits for the debugger to be attached. /// </summary> - private static void AttachDebuggerIfEnabled() + private static void AttachDebugger() { - if (IsDebuggingEnabled()) + // Wait for the debugger + while (true) { - while (true) + if (Debugger.IsAttached) { - if (Debugger.IsAttached) - { - break; - } - else - { - _logger.Log(Level.Info, - "Evaluator in debug mode, waiting for debugger to be attached..."); - Thread.Sleep(2000); - } + break; } + + logger.Log(Level.Info, "Evaluator in debug mode, waiting for debugger to be attached..."); + Thread.Sleep(2000); } } @@ -192,10 +173,10 @@ namespace Org.Apache.REEF.Evaluator if (!string.IsNullOrWhiteSpace(heartbeatPeriodFromConfig) && int.TryParse(heartbeatPeriodFromConfig, out heartbeatPeriod)) { - _heartbeatPeriodInMs = heartbeatPeriod; + heartbeatPeriodInMs = heartbeatPeriod; } - _logger.Log(Level.Verbose, - "Evaluator heartbeat period set to be " + _heartbeatPeriodInMs + " milliSeconds."); + logger.Log(Level.Verbose, + "Evaluator heartbeat period set to be " + heartbeatPeriodInMs + " milliSeconds."); } /// <summary> @@ -210,10 +191,10 @@ namespace Org.Apache.REEF.Evaluator if (!string.IsNullOrWhiteSpace(heartbeatMaxRetryFromConfig) && int.TryParse(heartbeatMaxRetryFromConfig, out maxHeartbeatRetry)) { - _heartbeatMaxRetry = maxHeartbeatRetry; + heartbeatMaxRetry = maxHeartbeatRetry; } - _logger.Log(Level.Verbose, - "Evaluator heartbeat max retry set to be " + _heartbeatMaxRetry + " times."); + logger.Log(Level.Verbose, + "Evaluator heartbeat max retry set to be " + heartbeatMaxRetry + " times."); } @@ -225,7 +206,7 @@ namespace Org.Apache.REEF.Evaluator { try { - _injector = TangFactory.GetTang().NewInjector(ReadEvaluatorConfiguration()); + injector = TangFactory.GetTang().NewInjector(ReadEvaluatorConfiguration()); } catch (Exception e) { @@ -281,11 +262,11 @@ namespace Org.Apache.REEF.Evaluator ISet<TraceListener> customTraceListeners; try { - customTraceListeners = _injector.GetInstance<CustomTraceListeners>().Listeners; + customTraceListeners = injector.GetInstance<CustomTraceListeners>().Listeners; } catch (Exception e) { - Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, _logger); + Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, logger); // custom trace listener not set properly, use empty set customTraceListeners = new HashSet<TraceListener>(); } @@ -293,8 +274,8 @@ namespace Org.Apache.REEF.Evaluator { Logger.AddTraceListener(listener); } - _logger = Logger.GetLogger(typeof(Evaluator)); - CustomTraceLevel traceLevel = _injector.GetInstance<CustomTraceLevel>(); + logger = Logger.GetLogger(typeof(Evaluator)); + CustomTraceLevel traceLevel = injector.GetInstance<CustomTraceLevel>(); Logger.SetCustomLevel(traceLevel.TraceLevel); } @@ -303,43 +284,22 @@ namespace Org.Apache.REEF.Evaluator Fail((Exception)e.ExceptionObject); } - private static string GetDirectoryListing(string path, StringBuilder resultBuilder = null) - { - if (null == resultBuilder) - { - resultBuilder = new StringBuilder(); - } - - // First, add the files to the listing - var files = Directory.GetFiles(path).Select(e => Path.Combine(path, e)); - resultBuilder.Append(string.Join(", ", files)); - // Second, add the directories recursively - var dirs = Directory.GetDirectories(path).Select(e => Path.Combine(path, e)); - foreach (var dir in dirs) - { - GetDirectoryListing(dir, resultBuilder); - } - return resultBuilder.ToString(); - } - // set the handlers for runtimeclock manually // we only need runtimestart and runtimestop handlers now private static void SetRuntimeHandlers(EvaluatorRuntime evaluatorRuntime, RuntimeClock clock) { ISet<IObserver<RuntimeStart>> runtimeStarts = new HashSet<IObserver<RuntimeStart>> { evaluatorRuntime }; - InjectionFutureImpl<ISet<IObserver<RuntimeStart>>> injectRuntimeStart = new InjectionFutureImpl<ISet<IObserver<RuntimeStart>>>(runtimeStarts); - clock.InjectedRuntimeStartHandler = injectRuntimeStart; + clock.InjectedRuntimeStartHandler = new InjectionFutureImpl<ISet<IObserver<RuntimeStart>>>(runtimeStarts); ISet<IObserver<RuntimeStop>> runtimeStops = new HashSet<IObserver<RuntimeStop>> { evaluatorRuntime }; - InjectionFutureImpl<ISet<IObserver<RuntimeStop>>> injectRuntimeStop = new InjectionFutureImpl<ISet<IObserver<RuntimeStop>>>(runtimeStops); - clock.InjectedRuntimeStopHandler = injectRuntimeStop; + clock.InjectedRuntimeStopHandler = new InjectionFutureImpl<ISet<IObserver<RuntimeStop>>>(runtimeStops); } private static void Fail(Exception ex) { var message = "Unhandled exception caught in Evaluator. Current files in the working directory: " + - GetDirectoryListing(Directory.GetCurrentDirectory()); - Utilities.Diagnostics.Exceptions.Throw(ex, message, _logger); + string.Join(", ", Directory.EnumerateFiles(Directory.GetCurrentDirectory(), "*.*", SearchOption.AllDirectories)); + Utilities.Diagnostics.Exceptions.Throw(ex, message, logger); } } } \ No newline at end of file
