http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs new file mode 100644 index 0000000..9231575 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs @@ -0,0 +1,359 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Globalization; +using System.Linq; +using Org.Apache.REEF.Common.Protobuf.ReefProtocol; +using Org.Apache.REEF.Common.Runtime.Evaluator.Task; +using Org.Apache.REEF.Common.Services; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context +{ + public class ContextManager : IDisposable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextManager)); + + private readonly Stack<ContextRuntime> _contextStack = new Stack<ContextRuntime>(); + + private readonly HeartBeatManager _heartBeatManager; + + private RootContextLauncher _rootContextLauncher; + + public ContextManager(HeartBeatManager heartBeatManager, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig) + { + using (LOGGER.LogFunction("ContextManager::ContextManager")) + { + _heartBeatManager = heartBeatManager; + _rootContextLauncher = new RootContextLauncher(_heartBeatManager.EvaluatorSettings.RootContextConfig, rootServiceConfig, rootTaskConfig); + } + } + + /// <summary> + /// Start the context manager. This initiates the root context. + /// </summary> + public void Start() + { + lock (_contextStack) + { + ContextRuntime rootContext = _rootContextLauncher.GetRootContext(); + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id {0}", rootContext.Id)); + _contextStack.Push(rootContext); + + if (_rootContextLauncher.RootTaskConfig.IsPresent()) + { + LOGGER.Log(Level.Info, "Launching the initial Task"); + try + { + _contextStack.Peek().StartTask(_rootContextLauncher.RootTaskConfig.Value, _rootContextLauncher.RootContextConfig.Id, _heartBeatManager); + } + catch (TaskClientCodeException e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Exception when trying to start a task.", LOGGER); + HandleTaskException(e); + } + } + } + } + + public bool ContextStackIsEmpty() + { + lock (_contextStack) + { + return (_contextStack.Count == 0); + } + } + + // TODO: codes here are slightly different from java since the protobuf.net does not generate the HasXXX method, may want to switch to proto-port later + + /// <summary> + /// Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts. + /// This also triggers the HeartBeatManager to send a heartbeat with the result of this operation. + /// </summary> + /// <param name="controlMessage"></param> + public void HandleTaskControl(ContextControlProto controlMessage) + { + try + { + byte[] message = controlMessage.task_message; + if (controlMessage.add_context != null && controlMessage.remove_context != null) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Received a message with both add and remove context. This is unsupported."), LOGGER); + } + if (controlMessage.add_context != null) + { + LOGGER.Log(Level.Info, "AddContext"); + AddContext(controlMessage.add_context); + // support submitContextAndTask() + if (controlMessage.start_task != null) + { + LOGGER.Log(Level.Info, "StartTask"); + StartTask(controlMessage.start_task); + } + else + { + // We need to trigger a heartbeat here. In other cases, the heartbeat will be triggered by the TaskRuntime + // Therefore this call can not go into addContext + LOGGER.Log(Level.Info, "Trigger Heartbeat"); + _heartBeatManager.OnNext(); + } + } + else if (controlMessage.remove_context != null) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "RemoveContext with id {0}", controlMessage.remove_context.context_id)); + RemoveContext(controlMessage.remove_context.context_id); + } + else if (controlMessage.start_task != null) + { + LOGGER.Log(Level.Info, "StartTask only"); + StartTask(controlMessage.start_task); + } + else if (controlMessage.stop_task != null) + { + LOGGER.Log(Level.Info, "CloseTask"); + _contextStack.Peek().CloseTask(message); + } + else if (controlMessage.suspend_task != null) + { + LOGGER.Log(Level.Info, "SuspendTask"); + _contextStack.Peek().SuspendTask(message); + } + else if (controlMessage.task_message != null) + { + LOGGER.Log(Level.Info, "DeliverTaskMessage"); + _contextStack.Peek().DeliverTaskMessage(message); + } + else if (controlMessage.context_message != null) + { + LOGGER.Log(Level.Info, "Handle context contol message"); + ContextMessageProto contextMessageProto = controlMessage.context_message; + bool deliveredMessage = false; + foreach (ContextRuntime context in _contextStack) + { + if (context.Id.Equals(contextMessageProto.context_id)) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Handle context message {0}", controlMessage.context_message.message)); + context.HandleContextMessaage(controlMessage.context_message.message); + deliveredMessage = true; + break; + } + } + if (!deliveredMessage) + { + InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent message to unknown context {0}", contextMessageProto.context_id)); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + } + else + { + InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown task control message: {0}", controlMessage.ToString())); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + } + catch (Exception e) + { + if (e is TaskClientCodeException) + { + HandleTaskException((TaskClientCodeException)e); + } + else if (e is ContextClientCodeException) + { + HandlContextException((ContextClientCodeException)e); + } + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, LOGGER); + } + } + + /// <summary> + /// Get TaskStatusProto of the currently running task, if there is any + /// </summary> + /// <returns>the TaskStatusProto of the currently running task, if there is any</returns> + public Optional<TaskStatusProto> GetTaskStatus() + { + if (_contextStack.Count == 0) + { + return Optional<TaskStatusProto>.Empty(); + + //throw new InvalidOperationException("Asked for an Task status while there isn't even a context running."); + } + return _contextStack.Peek().GetTaskStatus(); + } + + /// <summary> + /// get status of all contexts in the stack. + /// </summary> + /// <returns>the status of all contexts in the stack.</returns> + public ICollection<ContextStatusProto> GetContextStatusCollection() + { + ICollection<ContextStatusProto> result = new Collection<ContextStatusProto>(); + foreach (ContextRuntime runtime in _contextStack) + { + ContextStatusProto contextStatusProto = runtime.GetContextStatus(); + LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", contextStatusProto)); + result.Add(contextStatusProto); + } + return result; + } + + /// <summary> + /// Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack, + /// starting at the top. + /// </summary> + public void Dispose() + { + lock (_contextStack) + { + if (_contextStack != null && _contextStack.Any()) + { + LOGGER.Log(Level.Info, "context stack not empty, forcefully closing context runtime."); + ContextRuntime runtime = _contextStack.Last(); + if (runtime != null) + { + runtime.Dispose(); + } + } + } + } + + /// <summary> + /// Add a context to the stack. + /// </summary> + /// <param name="addContextProto"></param> + private void AddContext(AddContextProto addContextProto) + { + lock (_contextStack) + { + ContextRuntime currentTopContext = _contextStack.Peek(); + if (!currentTopContext.Id.Equals(addContextProto.parent_context_id, StringComparison.OrdinalIgnoreCase)) + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to instantiate a child context on context with id '{0}' while the current top context id is {1}", + addContextProto.parent_context_id, + currentTopContext.Id)); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + string contextConfigString = addContextProto.context_configuration; + ContextConfiguration contextConfiguration = new ContextConfiguration(contextConfigString); + ContextRuntime newTopContext; + if (addContextProto.service_configuration != null) + { + ServiceConfiguration serviceConfiguration = new ServiceConfiguration(addContextProto.service_configuration); + newTopContext = currentTopContext.SpawnChildContext(contextConfiguration, serviceConfiguration.TangConfig); + } + else + { + newTopContext = currentTopContext.SpawnChildContext(contextConfiguration); + } + _contextStack.Push(newTopContext); + } + } + + /// <summary> + /// Remove the context with the given ID from the stack. + /// </summary> + /// <param name="contextId"> context id</param> + private void RemoveContext(string contextId) + { + lock (_contextStack) + { + string currentTopContextId = _contextStack.Peek().Id; + if (!contextId.Equals(_contextStack.Peek().Id, StringComparison.OrdinalIgnoreCase)) + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to close context with id '{0}' while the top context id is {1}", contextId, currentTopContextId)); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + _contextStack.Peek().Dispose(); + if (_contextStack.Count > 1) + { + // We did not close the root context. Therefore, we need to inform the + // driver explicitly that this context is closed. The root context notification + // is implicit in the Evaluator close/done notification. + _heartBeatManager.OnNext(); // Ensure Driver gets notified of context DONE state + } + _contextStack.Pop(); + } + // System.gc(); // TODO: garbage collect? + } + + /// <summary> + /// Launch an Task. + /// </summary> + /// <param name="startTaskProto"></param> + private void StartTask(StartTaskProto startTaskProto) + { + lock (_contextStack) + { + ContextRuntime currentActiveContext = _contextStack.Peek(); + string expectedContextId = startTaskProto.context_id; + if (!expectedContextId.Equals(currentActiveContext.Id, StringComparison.OrdinalIgnoreCase)) + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task expected context '{0}' but the active context has Id '{1}'", expectedContextId, currentActiveContext.Id)); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + TaskConfiguration taskConfiguration = new TaskConfiguration(startTaskProto.configuration); + currentActiveContext.StartTask(taskConfiguration, expectedContextId, _heartBeatManager); + } + } + + /// <summary> + /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager + /// </summary> + /// <param name="e"></param> + private void HandleTaskException(TaskClientCodeException e) + { + LOGGER.Log(Level.Error, "TaskClientCodeException", e); + byte[] exception = ByteUtilities.StringToByteArrays(e.ToString()); + TaskStatusProto taskStatus = new TaskStatusProto() + { + context_id = e.ContextId, + task_id = e.TaskId, + result = exception, + state = State.FAILED + }; + LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeatb for a failed task: {0}", taskStatus.ToString())); + _heartBeatManager.OnNext(taskStatus); + } + + /// <summary> + /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager + /// </summary> + /// <param name="e"></param> + private void HandlContextException(ContextClientCodeException e) + { + LOGGER.Log(Level.Error, "ContextClientCodeException", e); + byte[] exception = ByteUtilities.StringToByteArrays(e.ToString()); + ContextStatusProto contextStatusProto = new ContextStatusProto() + { + context_id = e.ContextId, + context_state = ContextStatusProto.State.FAIL, + error = exception + }; + if (e.ParentId.IsPresent()) + { + contextStatusProto.parent_id = e.ParentId.Value; + } + LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeat for a failed context: {0}", contextStatusProto.ToString())); + _heartBeatManager.OnNext(contextStatusProto); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs new file mode 100644 index 0000000..6a5f7cb --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Globalization; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Protobuf.ReefProtocol; +using Org.Apache.REEF.Common.Runtime.Evaluator.Task; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context +{ + public class ContextRuntime + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextRuntime)); + // Context-local injector. This contains information that will not be available in child injectors. + private readonly IInjector _contextInjector; + //// Service injector. State in this injector moves to child injectors. + private readonly IInjector _serviceInjector; + + // Convenience class to hold all the event handlers for the context as well as the service instances. + private readonly ContextLifeCycle _contextLifeCycle; + + // The child context, if any. + private Optional<ContextRuntime> _childContext = Optional<ContextRuntime>.Empty(); + + // The parent context, if any. + private Optional<ContextRuntime> _parentContext = Optional<ContextRuntime>.Empty(); + + // The currently running task, if any. + private Optional<TaskRuntime> _task = Optional<TaskRuntime>.Empty(); + + private ContextStatusProto.State _contextState = ContextStatusProto.State.READY; + + /// <summary> + /// Create a new ContextRuntime. + /// </summary> + /// <param name="serviceInjector"></param> + /// <param name="contextConfiguration">the Configuration for this context.</param> + /// <param name="parentContext"></param> + public ContextRuntime( + IInjector serviceInjector, + IConfiguration contextConfiguration, + Optional<ContextRuntime> parentContext) + { + ContextConfiguration config = contextConfiguration as ContextConfiguration; + if (config == null) + { + var e = new ArgumentException("contextConfiguration is not of type ContextConfiguration"); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + _contextLifeCycle = new ContextLifeCycle(config.Id); + _serviceInjector = serviceInjector; + _parentContext = parentContext; + try + { + _contextInjector = serviceInjector.ForkInjector(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + + Optional<string> parentId = ParentContext.IsPresent() ? + Optional<string>.Of(ParentContext.Value.Id) : + Optional<string>.Empty(); + ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e); + + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); + } + // Trigger the context start events on contextInjector. + _contextLifeCycle.Start(); + } + + /// <summary> + /// Create a new ContextRuntime for the root context. + /// </summary> + /// <param name="serviceInjector"> </param> the serviceInjector to be used. + /// <param name="contextConfiguration"> the Configuration for this context.</param> + public ContextRuntime( + IInjector serviceInjector, + IConfiguration contextConfiguration) + : this(serviceInjector, contextConfiguration, Optional<ContextRuntime>.Empty()) + { + LOGGER.Log(Level.Info, "Instantiating root context"); + } + + public string Id + { + get { return _contextLifeCycle.Id; } + } + + public Optional<ContextRuntime> ParentContext + { + get { return _parentContext; } + } + + /// <summary> + /// Spawns a new context. + /// The new context will have a serviceInjector that is created by forking the one in this object with the given + /// serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector. + /// </summary> + /// <param name="contextConfiguration">the new context's context (local) Configuration.</param> + /// <param name="serviceConfiguration">the new context's service Configuration.</param> + /// <returns>a child context.</returns> + public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration, IConfiguration serviceConfiguration) + { + ContextRuntime childContext = null; + lock (_contextLifeCycle) + { + if (_task.IsPresent()) + { + var e = new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + if (_childContext.IsPresent()) + { + var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + try + { + IInjector childServiceInjector = _serviceInjector.ForkInjector(new IConfiguration[] { serviceConfiguration }); + childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this)); + _childContext = Optional<ContextRuntime>.Of(childContext); + return childContext; + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + + Optional<string> parentId = ParentContext.IsPresent() ? + Optional<string>.Of(ParentContext.Value.Id) : + Optional<string>.Empty(); + ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e); + + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); + } + } + return childContext; + } + + /// <summary> + /// Spawns a new context without services of its own. + /// The new context will have a serviceInjector that is created by forking the one in this object. The + /// contextConfiguration is used to fork the contextInjector from that new serviceInjector. + /// </summary> + /// <param name="contextConfiguration">the new context's context (local) Configuration.</param> + /// <returns> a child context.</returns> + public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration) + { + lock (_contextLifeCycle) + { + if (_task.IsPresent()) + { + var e = new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + if (_childContext.IsPresent()) + { + var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + IInjector childServiceInjector = _serviceInjector.ForkInjector(); + ContextRuntime childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this)); + _childContext = Optional<ContextRuntime>.Of(childContext); + return childContext; + } + } + + /// <summary> + /// Launches an Task on this context. + /// </summary> + /// <param name="taskConfiguration"></param> + /// <param name="contextId"></param> + /// <param name="heartBeatManager"></param> + public void StartTask(TaskConfiguration taskConfiguration, string contextId, HeartBeatManager heartBeatManager) + { + lock (_contextLifeCycle) + { + bool taskPresent = _task.IsPresent(); + bool taskEnded = taskPresent && _task.Value.HasEnded(); + + LOGGER.Log(Level.Info, "ContextRuntime::StartTask(TaskConfiguration)" + "task is present: " + taskPresent + " task has ended: " + taskEnded); + if (taskPresent) + { + LOGGER.Log(Level.Info, "Task state: " + _task.Value.GetTaskState()); + } + + if (taskEnded) + { + // clean up state + _task = Optional<TaskRuntime>.Empty(); + taskPresent = false; + } + if (taskPresent) + { + var e = new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + if (_childContext.IsPresent()) + { + var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + try + { + IInjector taskInjector = _contextInjector.ForkInjector(new IConfiguration[] { taskConfiguration.TangConfig }); + LOGGER.Log(Level.Info, "Trying to inject task with configuration" + taskConfiguration.ToString()); + TaskRuntime taskRuntime = new TaskRuntime(taskInjector, contextId, taskConfiguration.TaskId, heartBeatManager); // taskInjector.getInstance(TaskRuntime.class); + taskRuntime.Initialize(); + System.Threading.Tasks.Task.Run(new Action(taskRuntime.Start)); + _task = Optional<TaskRuntime>.Of(taskRuntime); + } + catch (Exception e) + { + var ex = new TaskClientCodeException(taskConfiguration.TaskId, Id, "Unable to instantiate the new task", e); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Task start error.", LOGGER); + } + } + } + + /// <summary> + /// Close this context. If there is a child context, this recursively closes it before closing this context. If + /// there is an Task currently running, that will be closed. + /// </summary> + public void Dispose() + { + lock (_contextLifeCycle) + { + _contextState = ContextStatusProto.State.DONE; + if (_task.IsPresent()) + { + LOGGER.Log(Level.Warning, "Shutting down an task because the underlying context is being closed."); + _task.Value.Close(null); + } + if (_childContext.IsPresent()) + { + LOGGER.Log(Level.Warning, "Closing a context because its parent context is being closed."); + _childContext.Value.Dispose(); + } + _contextLifeCycle.Close(); + if (_parentContext.IsPresent()) + { + ParentContext.Value.ResetChildContext(); + } + } + } + + /// <summary> + /// Issue a suspend call to the Task + /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING + /// in the log. + /// </summary> + /// <param name="message"> the suspend message to deliver or null if there is none.</param> + public void SuspendTask(byte[] message) + { + lock (_contextLifeCycle) + { + if (!_task.IsPresent()) + { + LOGGER.Log(Level.Warning, "Received a suspend task while there was no task running. Ignored"); + } + else + { + _task.Value.Suspend(message); + } + } + } + + /// <summary> + /// Issue a close call to the Task + /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING + /// in the log. + /// </summary> + /// <param name="message">the close message to deliver or null if there is none.</param> + public void CloseTask(byte[] message) + { + lock (_contextLifeCycle) + { + if (!_task.IsPresent()) + { + LOGGER.Log(Level.Warning, "Received a close task while there was no task running. Ignored"); + } + else + { + _task.Value.Close(message); + } + } + } + + /// <summary> + /// Deliver a message to the Task + /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING + /// in the log. + /// </summary> + /// <param name="message">the message to deliver or null if there is none.</param> + public void DeliverTaskMessage(byte[] message) + { + lock (_contextLifeCycle) + { + if (!_task.IsPresent()) + { + LOGGER.Log(Level.Warning, "Received an task message while there was no task running. Ignored"); + } + else + { + _task.Value.Deliver(message); + } + } + } + + public void HandleContextMessaage(byte[] mesage) + { + _contextLifeCycle.HandleContextMessage(mesage); + } + + /// <summary> + /// get state of the running Task + /// </summary> + /// <returns> the state of the running Task, if one is running.</returns> + public Optional<TaskStatusProto> GetTaskStatus() + { + lock (_contextLifeCycle) + { + if (_task.IsPresent()) + { + if (_task.Value.HasEnded()) + { + _task = Optional<TaskRuntime>.Empty(); + return Optional<TaskStatusProto>.Empty(); + } + else + { + TaskStatusProto taskStatusProto = _task.Value.GetStatusProto(); + if (taskStatusProto.state == State.RUNNING) + { + // only RUNNING status is allowed to rurn here, all other state pushed out to heartbeat + return Optional<TaskStatusProto>.Of(taskStatusProto); + } + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task state must be RUNNING, but instead is in {0} state", taskStatusProto.state)); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + return Optional<TaskStatusProto>.Empty(); + } + } + else + { + return Optional<TaskStatusProto>.Empty(); + } + } + } + + /// <summary> + /// Reset child context when parent is being closed + /// </summary> + public void ResetChildContext() + { + lock (_contextLifeCycle) + { + if (_childContext.IsPresent()) + { + _childContext = Optional<ContextRuntime>.Empty(); + } + else + { + var e = new InvalidOperationException("no child context set"); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + } + } + + /// <summary> + /// get context's status in protocol buffer + /// </summary> + /// <returns>this context's status in protocol buffer form.</returns> + public ContextStatusProto GetContextStatus() + { + lock (_contextLifeCycle) + { + ContextStatusProto contextStatusProto = new ContextStatusProto() + { + context_id = Id, + context_state = _contextState, + }; + if (_parentContext.IsPresent()) + { + contextStatusProto.parent_id = _parentContext.Value.Id; + } + + foreach (IContextMessageSource source in _contextLifeCycle.ContextMessageSources) + { + Optional<ContextMessage> contextMessageOptional = source.Message; + if (contextMessageOptional.IsPresent()) + { + ContextStatusProto.ContextMessageProto contextMessageProto + = new ContextStatusProto.ContextMessageProto() + { + source_id = contextMessageOptional.Value.MessageSourceId, + }; + contextMessageProto.message = ByteUtilities.CopyBytesFrom(contextMessageOptional.Value.Bytes); + contextStatusProto.context_message.Add(contextMessageProto); + } + } + return contextStatusProto; + } + } + } +} + ///// <summary> + ///// TODO: remove and use parameterless GetContextStatus above + ///// </summary> + ///// <returns>this context's status in protocol buffer form.</returns> + //public ContextStatusProto GetContextStatus(string contextId) + //{ + // ContextStatusProto contextStatusProto = new ContextStatusProto() + // { + // context_id = contextId, + // context_state = _contextState, + // }; + // return contextStatusProto; + //} + + ////// TODO: remove and use injection + //public void StartTask(ITask task, HeartBeatManager heartBeatManager, string taskId, string contextId) + //{ + // lock (_contextLifeCycle) + // { + // if (_task.IsPresent() && _task.Value.HasEnded()) + // { + // // clean up state + // _task = Optional<TaskRuntime>.Empty(); + // } + // if (_task.IsPresent()) + // { + // throw new InvalidOperationException( + // string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here + // } + // if (_childContext.IsPresent()) + // { + // throw new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); + // } + // try + // { + // // final Injector taskInjector = contextInjector.forkInjector(taskConfiguration); + // TaskRuntime taskRuntime // taskInjector.getInstance(TaskRuntime.class); + // = new TaskRuntime(task, heartBeatManager); + // LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Starting task '{0}'", taskId)); + // taskRuntime.Initialize(taskId, contextId); + // taskRuntime.Start(); + // _task = Optional<TaskRuntime>.Of(taskRuntime); + // } + // catch (Exception e) + // { + // throw new InvalidOperationException("Unable to instantiate the new task"); + // } + // } + //} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextStartImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextStartImpl.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextStartImpl.cs new file mode 100644 index 0000000..be23398 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextStartImpl.cs @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Events; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context +{ + class ContextStartImpl : IContextStart + { + public ContextStartImpl(string id) + { + Id = id; + } + + public string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextStopImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextStopImpl.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextStopImpl.cs new file mode 100644 index 0000000..95634d2 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextStopImpl.cs @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Events; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context +{ + class ContextStopImpl : IContextStop + { + public ContextStopImpl(string id) + { + Id = id; + } + + public string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs new file mode 100644 index 0000000..2dd417a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Globalization; +using Org.Apache.REEF.Common.Services; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context +{ + /// <summary> + /// Helper class that encapsulates the root context configuration: With or without services and an initial task. + /// </summary> + public sealed class RootContextLauncher + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(RootContextLauncher)); + + private readonly IInjector _rootServiceInjector = null; + + private ContextRuntime _rootContext = null; + + private ContextConfiguration _rootContextConfiguration = null; + + public RootContextLauncher(ContextConfiguration rootContextConfig, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig) + { + _rootContextConfiguration = rootContextConfig; + _rootServiceInjector = InjectServices(rootServiceConfig); + RootTaskConfig = rootTaskConfig; + } + + public Optional<TaskConfiguration> RootTaskConfig { get; set; } + + public ContextConfiguration RootContextConfig + { + get { return _rootContextConfiguration; } + set { _rootContextConfiguration = value; } + } + + public ContextRuntime GetRootContext() + { + if (_rootContext == null) + { + _rootContext = GetRootContext(_rootServiceInjector, _rootContextConfiguration); + } + return _rootContext; + } + + private IInjector InjectServices(Optional<ServiceConfiguration> serviceConfig) + { + IInjector rootServiceInjector; + + if (serviceConfig.IsPresent()) + { + rootServiceInjector = TangFactory.GetTang().NewInjector(serviceConfig.Value.TangConfig); + InjectedServices services = null; + try + { + services = rootServiceInjector.GetInstance<InjectedServices>(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Failed to instantiate service.", LOGGER); + InvalidOperationException ex = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Failed to inject service: encoutned error {1} with message [{0}] and stack trace:[{1}]", e, e.Message, e.StackTrace)); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); + } + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "injected {0} service(s)", services.Services.Count)); + } + else + { + rootServiceInjector = TangFactory.GetTang().NewInjector(); + LOGGER.Log(Level.Info, "no service provided for injection."); + } + + return rootServiceInjector; + } + + private ContextRuntime GetRootContext( + IInjector rootServiceInjector, + IConfiguration rootContextConfiguration) + { + ContextRuntime result; + result = new ContextRuntime(rootServiceInjector, rootContextConfiguration); + return result; + } + } +} +//if (rootServiceInjector != null) +//{ +// try +// { +// rootServiceInjector = rootServiceInjector.ForkInjector(serviceConfigs); +// } +// catch (Exception e) +// { +// throw new ContextClientCodeException(ContextClientCodeException.GetId(rootContextConfiguration), +// Optional<String>.Empty(), +// "Unable to instatiate the root context", e); +// } +// result = new ContextRuntime(rootServiceInjector, rootContextConfiguration); +//} +//else +//{ +// result = new ContextRuntime(rootServiceInjector.ForkInjector(), rootContextConfiguration); +//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs new file mode 100644 index 0000000..3af0aba --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Globalization; +using Org.Apache.REEF.Common.Protobuf.ReefProtocol; +using Org.Apache.REEF.Common.Runtime.Evaluator.Context; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Time; +using Org.Apache.REEF.Wake.Time.Runtime.Event; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator +{ + public class EvaluatorRuntime : IObserver<RuntimeStart>, IObserver<RuntimeStop>, IObserver<REEFMessage> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorRuntime)); + + private readonly string _evaluatorId; + + private readonly ContextManager _contextManager; + + private readonly HeartBeatManager _heartBeatManager; + + private readonly IRemoteManager<REEFMessage> _remoteManager; + + private readonly IClock _clock; + + private State _state = State.INIT; + + private IDisposable _evaluatorControlChannel; + + [Inject] + public EvaluatorRuntime( + ContextManager contextManager, + HeartBeatManager heartBeatManager) + { + using (LOGGER.LogFunction("EvaluatorRuntime::EvaluatorRuntime")) + { + _clock = heartBeatManager.EvaluatorSettings.RuntimeClock; + _heartBeatManager = heartBeatManager; + _contextManager = contextManager; + _evaluatorId = heartBeatManager.EvaluatorSettings.EvalutorId; + _remoteManager = heartBeatManager.EvaluatorSettings.RemoteManager; + + ReefMessageProtoObserver driverObserver = new ReefMessageProtoObserver(); + + // subscribe to driver proto message + driverObserver.Subscribe(o => OnNext(o.Message)); + + // register the driver observer + _evaluatorControlChannel = _remoteManager.RegisterObserver(driverObserver); + + // start the hearbeat + _clock.ScheduleAlarm(0, heartBeatManager); + } + } + + public State State + { + get + { + return _state; + } + } + + public void Handle(EvaluatorControlProto message) + { + lock (_heartBeatManager) + { + LOGGER.Log(Level.Info, "Handle Evaluator control message"); + if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase)) + { + Handle(new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Identifier mismatch: message for evaluator id[{0}] sent to evaluator id[{1}]", message.identifier, _evaluatorId))); + } + else if (_state != State.RUNNING) + { + Handle(new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Evaluator received a control message but its state is not {0} but rather {1}", State.RUNNING, _state))); + } + else + { + if (message.context_control != null) + { + LOGGER.Log(Level.Info, "Send task control message to ContextManager"); + try + { + _contextManager.HandleTaskControl(message.context_control); + if (_contextManager.ContextStackIsEmpty() && _state == State.RUNNING) + { + LOGGER.Log(Level.Info, "Context stack is empty, done"); + _state = State.DONE; + _heartBeatManager.OnNext(GetEvaluatorStatus()); + _clock.Dispose(); + } + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + Handle(e); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(e.ToString(), e), LOGGER); + } + } + if (message.kill_evaluator != null) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by the driver.", _evaluatorId)); + _state = State.KILLED; + _clock.Dispose(); + } + } + } + } + + public EvaluatorStatusProto GetEvaluatorStatus() + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state : {0}", _state)); + EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto() + { + evaluator_id = _evaluatorId, + state = _state + }; + return evaluatorStatusProto; + } + + public void OnNext(RuntimeStart runtimeStart) + { + lock (_evaluatorId) + { + try + { + LOGGER.Log(Level.Info, "Runtime start"); + if (_state != State.INIT) + { + var e = new InvalidOperationException("State should be init."); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + _state = State.RUNNING; + _contextManager.Start(); + _heartBeatManager.OnNext(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + Handle(e); + } + } + } + + void IObserver<RuntimeStart>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<REEFMessage>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<REEFMessage>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStop>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStop>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<RuntimeStart>.OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnNext(RuntimeStop runtimeStop) + { + LOGGER.Log(Level.Info, "Runtime stop"); + _contextManager.Dispose(); + + if (_state == State.RUNNING) + { + _state = State.DONE; + _heartBeatManager.OnNext(); + } + try + { + _evaluatorControlChannel.Dispose(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, "Exception during shut down.", LOGGER); + } + LOGGER.Log(Level.Info, "EvaluatorRuntime shutdown complete"); + } + + public void OnNext(REEFMessage value) + { + if (value != null && value.evaluatorControl != null) + { + LOGGER.Log(Level.Info, "Received a REEFMessage with EvaluatorControl"); + Handle(value.evaluatorControl); + } + } + + private void Handle(Exception e) + { + lock (_heartBeatManager) + { + LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with exception", _evaluatorId), e); + _state = State.FAILED; + string errorMessage = string.Format( + CultureInfo.InvariantCulture, + "failed with error [{0}] with mesage [{1}] and stack trace [{2}]", + e, + e.Message, + e.StackTrace); + EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto() + { + evaluator_id = _evaluatorId, + error = ByteUtilities.StringToByteArrays(errorMessage), + state = _state + }; + _heartBeatManager.OnNext(evaluatorStatusProto); + _contextManager.Dispose(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs new file mode 100644 index 0000000..a9b436b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Common.Io; +using Org.Apache.REEF.Common.Protobuf.ReefProtocol; +using Org.Apache.REEF.Common.Runtime.Evaluator.Context; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Time; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator +{ + // TODO: merge with EvaluatorConfigurations class + public class EvaluatorSettings + { + private string _applicationId; + + private string _evaluatorId; + + private int _heartBeatPeriodInMs; + + private int _maxHeartbeatRetries; + + private ContextConfiguration _rootContextConfig; + + private IClock _clock; + + private IRemoteManager<REEFMessage> _remoteManager; + + private IInjector _injector; + + private EvaluatorOperationState _operationState; + + private INameClient _nameClient; + + public EvaluatorSettings( + string applicationId, + string evaluatorId, + int heartbeatPeriodInMs, + int maxHeartbeatRetries, + ContextConfiguration rootContextConfig, + IClock clock, + IRemoteManager<REEFMessage> remoteManager, + IInjector injecor) + { + if (string.IsNullOrWhiteSpace(evaluatorId)) + { + throw new ArgumentNullException("evaluatorId"); + } + if (rootContextConfig == null) + { + throw new ArgumentNullException("rootContextConfig"); + } + if (clock == null) + { + throw new ArgumentNullException("clock"); + } + if (remoteManager == null) + { + throw new ArgumentNullException("remoteManager"); + } + if (injecor == null) + { + throw new ArgumentNullException("injecor"); + } + _applicationId = applicationId; + _evaluatorId = evaluatorId; + _heartBeatPeriodInMs = heartbeatPeriodInMs; + _maxHeartbeatRetries = maxHeartbeatRetries; + _rootContextConfig = rootContextConfig; + _clock = clock; + _remoteManager = remoteManager; + _injector = injecor; + _operationState = EvaluatorOperationState.OPERATIONAL; + } + + public EvaluatorOperationState OperationState + { + get + { + return _operationState; + } + + set + { + _operationState = value; + } + } + + public string EvalutorId + { + get + { + return _evaluatorId; + } + } + + public int HeartBeatPeriodInMs + { + get + { + return _heartBeatPeriodInMs; + } + } + + public string ApplicationId + { + get + { + return _applicationId; + } + } + + public int MaxHeartbeatFailures + { + get + { + return _maxHeartbeatRetries; + } + } + + public ContextConfiguration RootContextConfig + { + get + { + return _rootContextConfig; + } + } + + public IClock RuntimeClock + { + get + { + return _clock; + } + } + + public INameClient NameClient + { + get + { + return _nameClient; + } + + set + { + _nameClient = value; + } + } + + public IRemoteManager<REEFMessage> RemoteManager + { + get + { + return _remoteManager; + } + } + + public IInjector Injector + { + get + { + return _injector; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs new file mode 100644 index 0000000..6a6e287 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Globalization; +using System.Linq; +using System.Net; +using System.Threading; +using Org.Apache.REEF.Common.Evaluator; +using Org.Apache.REEF.Common.Protobuf.ReefProtocol; +using Org.Apache.REEF.Common.Runtime.Evaluator.Context; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Wake.Remote.Impl; +using Org.Apache.REEF.Wake.Time; +using Org.Apache.REEF.Wake.Time.Event; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator +{ + public class HeartBeatManager : IObserver<Alarm> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(HeartBeatManager)); + + private static readonly MachineStatus MachineStatus = new MachineStatus(); + + private readonly IRemoteManager<REEFMessage> _remoteManager; + + private readonly IClock _clock; + + private readonly int _heartBeatPeriodInMillSeconds; + + private readonly int _maxHeartbeatRetries = 0; + + private readonly string _evaluatorId; + + private IRemoteIdentifier _remoteId; + + private IObserver<REEFMessage> _observer; + + private int _heartbeatFailures = 0; + + private IDriverConnection _driverConnection; + + private EvaluatorSettings _evaluatorSettings; + + // the queue can only contains the following: + // 1. all failed heartbeats (regular and event-based) before entering RECOVERY state + // 2. event-based heartbeats generated in RECOVERY state (since there will be no attempt to send regular heartbeat) + private Queue<EvaluatorHeartbeatProto> _queuedHeartbeats = new Queue<EvaluatorHeartbeatProto>(); + + public HeartBeatManager(EvaluatorSettings settings, IRemoteIdentifier remoteId) + { + using (LOGGER.LogFunction("HeartBeatManager::HeartBeatManager")) + { + _remoteManager = settings.RemoteManager; + _remoteId = remoteId; + _evaluatorId = settings.EvalutorId; + _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId)); + _clock = settings.RuntimeClock; + _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs; + _maxHeartbeatRetries = settings.MaxHeartbeatFailures; + EvaluatorSettings = settings; + MachineStatus.ToString(); // kick start the CPU perf counter + } + } + + [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")] + public EvaluatorRuntime _evaluatorRuntime { get; set; } + + [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")] + public ContextManager _contextManager { get; set; } + + public EvaluatorSettings EvaluatorSettings + { + get + { + return _evaluatorSettings; + } + + private set + { + _evaluatorSettings = value; + } + } + + public void Send(EvaluatorHeartbeatProto evaluatorHeartbeatProto) + { + lock (_queuedHeartbeats) + { + if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "In RECOVERY mode, heartbeat queued as [{0}]. ", evaluatorHeartbeatProto)); + _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto); + return; + } + + // NOT during recovery, try to send + REEFMessage payload = new REEFMessage(evaluatorHeartbeatProto); + try + { + _observer.OnNext(payload); + _heartbeatFailures = 0; // reset failure counts if we are having intermidtten (not continuous) failures + } + catch (Exception e) + { + if (evaluatorHeartbeatProto.task_status == null || evaluatorHeartbeatProto.task_status.state != State.RUNNING) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications to driver when no task is running, recovery NOT supported for such scenario", LOGGER); + } + + _heartbeatFailures++; + + _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto); + LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e); + + if (_heartbeatFailures >= _maxHeartbeatRetries) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Heartbeat communications to driver reached max of {0} failures. \n==== Driver is considered dead/unreachable. === \n=========== Entering RECOVERY mode. ===========", _heartbeatFailures)); + try + { + _driverConnection = _evaluatorSettings.Injector.GetInstance<IDriverConnection>(); + } + catch (Exception ex) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Failed to inject the driver reconnect implementation", LOGGER); + } + LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection); + _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY; + + // clean heartbeat failure + _heartbeatFailures = 0; + } + } + } + } + + /// <summary> + /// Assemble a complete new heartbeat and send it out. + /// </summary> + public void OnNext() + { + LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext()"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext()"); + EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(); + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); + Send(heartbeatProto); + } + } + + /// <summary> + /// Called with a specific TaskStatus that must be delivered to the driver + /// </summary> + /// <param name="taskStatusProto"></param> + public void OnNext(TaskStatusProto taskStatusProto) + { + LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(TaskStatusProto)"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(TaskStatusProto)"); + EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto( + _evaluatorRuntime.GetEvaluatorStatus(), + _contextManager.GetContextStatusCollection(), + Optional<TaskStatusProto>.Of(taskStatusProto)); + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); + Send(heartbeatProto); + } + } + + /// <summary> + /// Called with a specific ContextStatusProto that must be delivered to the driver + /// </summary> + /// <param name="contextStatusProto"></param> + public void OnNext(ContextStatusProto contextStatusProto) + { + LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(ContextStatusProto)"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(ContextStatusProto)"); + List<ContextStatusProto> contextStatusProtos = new List<ContextStatusProto>(); + contextStatusProtos.Add(contextStatusProto); + contextStatusProtos.AddRange(_contextManager.GetContextStatusCollection()); + EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto( + _evaluatorRuntime.GetEvaluatorStatus(), + contextStatusProtos, + Optional<TaskStatusProto>.Empty()); + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); + Send(heartbeatProto); + } + } + + /// <summary> + /// Called with a specific EvaluatorStatus that must be delivered to the driver + /// </summary> + /// <param name="evaluatorStatusProto"></param> + public void OnNext(EvaluatorStatusProto evaluatorStatusProto) + { + LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(EvaluatorStatusProto)"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(EvaluatorStatusProto)"); + EvaluatorHeartbeatProto heartbeatProto = new EvaluatorHeartbeatProto() + { + timestamp = CurrentTimeMilliSeconds(), + evaluator_status = evaluatorStatusProto + }; + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); + Send(heartbeatProto); + } + } + + public void OnNext(Alarm value) + { + LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(Alarm)"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(Alarm)"); + if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && _evaluatorRuntime.State == State.RUNNING) + { + EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto(); + LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString())); + Send(evaluatorHeartbeatProto); + _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this); + } + else + { + LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", _evaluatorSettings.OperationState, _evaluatorRuntime.State)); + try + { + DriverInformation driverInformation = _driverConnection.GetDriverInformation(_evaluatorSettings.ApplicationId); + if (driverInformation == null) + { + LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later."); + } + else + { + LOGGER.Log( + Level.Info, + string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId)); + Recover(driverInformation); + } + } + catch (Exception e) + { + // we do not want any exception to stop the query for driver status + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, LOGGER); + } + _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this); + } + } + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + private static long CurrentTimeMilliSeconds() + { + // this is an implmenation to get current time milli second counted from Jan 1st, 1970 + // it is chose as such to be compatible with java implmentation + DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); + return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds; + } + + private void Recover(DriverInformation driverInformation) + { + IPEndPoint driverEndpoint = NetUtilities.ParseIpEndpoint(driverInformation.DriverRemoteIdentifier); + _remoteId = new SocketRemoteIdentifier(driverEndpoint); + _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId)); + lock (_evaluatorSettings) + { + if (_evaluatorSettings.NameClient != null) + { + try + { + LOGGER.Log(Level.Verbose, "Trying to reset and reconnect to name server" + driverInformation.NameServerId); + _evaluatorSettings.NameClient.Restart(NetUtilities.ParseIpEndpoint(driverInformation.NameServerId)); + LOGGER.Log(Level.Info, "Reconnected to name server: " + driverInformation.NameServerId); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + } + } + } + + lock (_queuedHeartbeats) + { + bool firstHeartbeatInQueue = true; + while (_queuedHeartbeats.Any()) + { + LOGGER.Log(Level.Info, "Sending cached recovery heartbeats to " + _remoteId); + try + { + if (firstHeartbeatInQueue) + { + // first heartbeat is specially construted to include the recovery flag + EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue()); + LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat); + _observer.OnNext(new REEFMessage(recoveryHeartbeat)); + firstHeartbeatInQueue = false; + } + else + { + _observer.OnNext(new REEFMessage(_queuedHeartbeats.Dequeue())); + } + } + catch (Exception e) + { + // we do not handle failures during RECOVERY + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow( + e, + Level.Error, + string.Format(CultureInfo.InvariantCulture, "Hearbeat attempt failed in RECOVERY mode to Driver {0} , giving up...", _remoteId), + LOGGER); + } + Thread.Sleep(500); + } + } + _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL; + LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ==========="); + } + + private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat) + { + heartbeat.recovery = true; + heartbeat.context_status.ForEach(c => c.recovery = true); + heartbeat.task_status.recovery = true; + return heartbeat; + } + + private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto() + { + return GetEvaluatorHeartbeatProto( + _evaluatorRuntime.GetEvaluatorStatus(), + _contextManager.GetContextStatusCollection(), + _contextManager.GetTaskStatus()); + } + + private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto( + EvaluatorStatusProto evaluatorStatusProto, + ICollection<ContextStatusProto> contextStatusProtos, + Optional<TaskStatusProto> taskStatusProto) + { + EvaluatorHeartbeatProto evaluatorHeartbeatProto = new EvaluatorHeartbeatProto() + { + timestamp = CurrentTimeMilliSeconds(), + evaluator_status = evaluatorStatusProto + }; + foreach (ContextStatusProto contextStatusProto in contextStatusProtos) + { + evaluatorHeartbeatProto.context_status.Add(contextStatusProto); + } + if (taskStatusProto.IsPresent()) + { + evaluatorHeartbeatProto.task_status = taskStatusProto.Value; + } + return evaluatorHeartbeatProto; + } + } +}
