http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs new file mode 100644 index 0000000..3a0b474 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Tang.Interface; +using System; + +namespace Org.Apache.REEF.Common.Context +{ + public class ContextClientCodeException : Exception + { + private readonly string _contextId; + private readonly Optional<string> _parentId; + + /// <summary> + /// construt the exception that caused the error + /// </summary> + /// <param name="contextId"> the id of the failed context.</param> + /// <param name="parentId"> the id of the failed context's parent, if any.</param> + /// <param name="message"> the error message </param> + /// <param name="cause"> the exception that caused the error</param> + public ContextClientCodeException( + string contextId, + Optional<string> parentId, + string message, + Exception cause) + : base("Failure in context '" + contextId + "': " + message, cause) + { + _contextId = contextId; + _parentId = parentId; + } + + public string ContextId + { + get { return _contextId; } + } + + public Optional<string> ParentId + { + get { return _parentId; } + } + + /// <summary> + /// Extracts a context id from the given configuration. + /// </summary> + /// <param name="c"></param> + /// <returns>the context id in the given configuration.</returns> + public static string GetId(IConfiguration c) + { + // TODO: update after TANG is available + return string.Empty; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs new file mode 100644 index 0000000..bcd7fb0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Interface; +using System; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using Org.Apache.REEF.Tang.Types; + +namespace Org.Apache.REEF.Common.Evaluator.Context +{ + public class ContextConfiguration : IConfiguration + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextConfiguration)); + + private Dictionary<string, string> _settings; + + public ContextConfiguration(string configString) + { + using (LOGGER.LogFunction("ContextConfiguration::ContextConfigurationn")) + { + ContainerDirectory = Directory.GetCurrentDirectory(); + + _settings = new Dictionary<string, string>(); + AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString); + foreach (ConfigurationEntry config in avroConfiguration.Bindings) + { + if (config.key.Contains(REEF.Evaluator.Constants.ContextIdentifier)) + { + config.key = REEF.Evaluator.Constants.ContextIdentifier; + LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "{0} detected for context id with value {1}", config.key, config.value)); + } + _settings.Add(config.key, config.value); + } + if (!_settings.ContainsKey(REEF.Evaluator.Constants.ContextIdentifier)) + { + string msg = "Required parameter ContextIdentifier not provided."; + LOGGER.Log(Level.Error, msg); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER); + } + } + } + + public string Id + { + get { return _settings[REEF.Evaluator.Constants.ContextIdentifier]; } + } + + public string ContainerDirectory { get; set; } + + public IConfigurationBuilder newBuilder() + { + throw new NotImplementedException(); + } + + public string GetNamedParameter(INamedParameterNode np) + { + throw new NotImplementedException(); + } + + public IClassHierarchy GetClassHierarchy() + { + throw new NotImplementedException(); + } + + public ISet<object> GetBoundSet(INamedParameterNode np) + { + throw new NotImplementedException(); + } + + public IClassNode GetBoundConstructor(IClassNode cn) + { + throw new NotImplementedException(); + } + + public IClassNode GetBoundImplementation(IClassNode cn) + { + throw new NotImplementedException(); + } + + public IConstructorDef GetLegacyConstructor(IClassNode cn) + { + throw new NotImplementedException(); + } + + public ICollection<IClassNode> GetBoundImplementations() + { + throw new NotImplementedException(); + } + + public ICollection<IClassNode> GetBoundConstructors() + { + throw new NotImplementedException(); + } + + public ICollection<INamedParameterNode> GetNamedParameters() + { + throw new NotImplementedException(); + } + + public ICollection<IClassNode> GetLegacyConstructors() + { + throw new NotImplementedException(); + } + + public IList<object> GetBoundList(INamedParameterNode np) + { + throw new NotImplementedException(); + } + + public IEnumerator<KeyValuePair<INamedParameterNode, object>> GetBoundSets() + { + throw new NotImplementedException(); + } + + public IDictionary<INamedParameterNode, IList<object>> GetBoundList() + { + throw new NotImplementedException(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs new file mode 100644 index 0000000..97e65c0 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Events; +using System; +using System.Collections.Generic; + +namespace Org.Apache.REEF.Common.Context +{ + /// <summary> + /// This class is used to trigger all the context life-cycle dependent events. + /// </summary> + class ContextLifeCycle + { + private HashSet<IObserver<IContextStart>> _contextStartHandlers; + + private HashSet<IObserver<IContextStop>> _contextStopHandlers; + + private HashSet<IContextMessageSource> _contextMessageSources; + + // @Inject + public ContextLifeCycle( + string id, + HashSet<IObserver<IContextStart>> contextStartHandlers, + HashSet<IObserver<IContextStop>> contextStopHandlers, + HashSet<IContextMessageSource> contextMessageSources) + { + Id = id; + _contextStartHandlers = contextStartHandlers; + _contextStopHandlers = contextStopHandlers; + _contextMessageSources = contextMessageSources; + } + + public ContextLifeCycle(string contextId) + { + Id = contextId; + _contextStartHandlers = new HashSet<IObserver<IContextStart>>(); + _contextStopHandlers = new HashSet<IObserver<IContextStop>>(); + _contextMessageSources = new HashSet<IContextMessageSource>(); + } + + public string Id { get; private set; } + + public HashSet<IContextMessageSource> ContextMessageSources + { + get { return _contextMessageSources; } + } + + /// <summary> + /// Fires ContextStart to all registered event handlers. + /// </summary> + public void Start() + { + IContextStart contextStart = new ContextStartImpl(Id); + + // TODO: enable + //foreach (IObserver<IContextStart> startHandler in _contextStartHandlers) + //{ + // startHandler.OnNext(contextStart); + //} + } + + /// <summary> + /// Fires ContextStop to all registered event handlers. + /// </summary> + public void Close() + { + //IContextStop contextStop = new ContextStopImpl(Id); + //foreach (IObserver<IContextStop> startHandler in _contextStopHandlers) + //{ + // startHandler.OnNext(contextStop); + //} + } + + public void HandleContextMessage(byte[] message) + { + //contextMessageHandler.onNext(message); + } + + /// <summary> + /// get the set of ContextMessageSources configured + /// </summary> + /// <returns>(a shallow copy of) the set of ContextMessageSources configured.</returns> + public HashSet<IContextMessageSource> GetContextMessageSources() + { + return new HashSet<IContextMessageSource>(_contextMessageSources); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs new file mode 100644 index 0000000..7c4d288 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Evaluator.Context; +using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto; +using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.REEF.Common.Task; +using Org.Apache.REEF.Evaluator; +using Org.Apache.REEF.Services; +using Org.Apache.REEF.Tasks; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Globalization; +using System.Linq; + +namespace Org.Apache.REEF.Common.Context +{ + public class ContextManager : IDisposable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextManager)); + + private readonly Stack<ContextRuntime> _contextStack = new Stack<ContextRuntime>(); + + private readonly HeartBeatManager _heartBeatManager; + + private RootContextLauncher _rootContextLauncher; + + public ContextManager(HeartBeatManager heartBeatManager, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig) + { + using (LOGGER.LogFunction("ContextManager::ContextManager")) + { + _heartBeatManager = heartBeatManager; + _rootContextLauncher = new RootContextLauncher(_heartBeatManager.EvaluatorSettings.RootContextConfig, rootServiceConfig, rootTaskConfig); + } + } + + /// <summary> + /// Start the context manager. This initiates the root context. + /// </summary> + public void Start() + { + lock (_contextStack) + { + ContextRuntime rootContext = _rootContextLauncher.GetRootContext(); + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id {0}", rootContext.Id)); + _contextStack.Push(rootContext); + + if (_rootContextLauncher.RootTaskConfig.IsPresent()) + { + LOGGER.Log(Level.Info, "Launching the initial Task"); + try + { + _contextStack.Peek().StartTask(_rootContextLauncher.RootTaskConfig.Value, _rootContextLauncher.RootContextConfig.Id, _heartBeatManager); + } + catch (TaskClientCodeException e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Exception when trying to start a task.", LOGGER); + HandleTaskException(e); + } + } + } + } + + public bool ContextStackIsEmpty() + { + lock (_contextStack) + { + return (_contextStack.Count == 0); + } + } + + // TODO: codes here are slightly different from java since the protobuf.net does not generate the HasXXX method, may want to switch to proto-port later + + /// <summary> + /// Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts. + /// This also triggers the HeartBeatManager to send a heartbeat with the result of this operation. + /// </summary> + /// <param name="controlMessage"></param> + public void HandleTaskControl(ContextControlProto controlMessage) + { + try + { + byte[] message = controlMessage.task_message; + if (controlMessage.add_context != null && controlMessage.remove_context != null) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Received a message with both add and remove context. This is unsupported."), LOGGER); + } + if (controlMessage.add_context != null) + { + LOGGER.Log(Level.Info, "AddContext"); + AddContext(controlMessage.add_context); + // support submitContextAndTask() + if (controlMessage.start_task != null) + { + LOGGER.Log(Level.Info, "StartTask"); + StartTask(controlMessage.start_task); + } + else + { + // We need to trigger a heartbeat here. In other cases, the heartbeat will be triggered by the TaskRuntime + // Therefore this call can not go into addContext + LOGGER.Log(Level.Info, "Trigger Heartbeat"); + _heartBeatManager.OnNext(); + } + } + else if (controlMessage.remove_context != null) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "RemoveContext with id {0}", controlMessage.remove_context.context_id)); + RemoveContext(controlMessage.remove_context.context_id); + } + else if (controlMessage.start_task != null) + { + LOGGER.Log(Level.Info, "StartTask only"); + StartTask(controlMessage.start_task); + } + else if (controlMessage.stop_task != null) + { + LOGGER.Log(Level.Info, "CloseTask"); + _contextStack.Peek().CloseTask(message); + } + else if (controlMessage.suspend_task != null) + { + LOGGER.Log(Level.Info, "SuspendTask"); + _contextStack.Peek().SuspendTask(message); + } + else if (controlMessage.task_message != null) + { + LOGGER.Log(Level.Info, "DeliverTaskMessage"); + _contextStack.Peek().DeliverTaskMessage(message); + } + else if (controlMessage.context_message != null) + { + LOGGER.Log(Level.Info, "Handle context contol message"); + ContextMessageProto contextMessageProto = controlMessage.context_message; + bool deliveredMessage = false; + foreach (ContextRuntime context in _contextStack) + { + if (context.Id.Equals(contextMessageProto.context_id)) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Handle context message {0}", controlMessage.context_message.message)); + context.HandleContextMessaage(controlMessage.context_message.message); + deliveredMessage = true; + break; + } + } + if (!deliveredMessage) + { + InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent message to unknown context {0}", contextMessageProto.context_id)); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + } + else + { + InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown task control message: {0}", controlMessage.ToString())); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + } + catch (Exception e) + { + if (e is TaskClientCodeException) + { + HandleTaskException((TaskClientCodeException)e); + } + else if (e is ContextClientCodeException) + { + HandlContextException((ContextClientCodeException)e); + } + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, LOGGER); + } + } + + /// <summary> + /// Get TaskStatusProto of the currently running task, if there is any + /// </summary> + /// <returns>the TaskStatusProto of the currently running task, if there is any</returns> + public Optional<TaskStatusProto> GetTaskStatus() + { + if (_contextStack.Count == 0) + { + return Optional<TaskStatusProto>.Empty(); + + //throw new InvalidOperationException("Asked for an Task status while there isn't even a context running."); + } + return _contextStack.Peek().GetTaskStatus(); + } + + /// <summary> + /// get status of all contexts in the stack. + /// </summary> + /// <returns>the status of all contexts in the stack.</returns> + public ICollection<ContextStatusProto> GetContextStatusCollection() + { + ICollection<ContextStatusProto> result = new Collection<ContextStatusProto>(); + foreach (ContextRuntime runtime in _contextStack) + { + ContextStatusProto contextStatusProto = runtime.GetContextStatus(); + LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", contextStatusProto)); + result.Add(contextStatusProto); + } + return result; + } + + /// <summary> + /// Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack, + /// starting at the top. + /// </summary> + public void Dispose() + { + lock (_contextStack) + { + if (_contextStack != null && _contextStack.Any()) + { + LOGGER.Log(Level.Info, "context stack not empty, forcefully closing context runtime."); + ContextRuntime runtime = _contextStack.Last(); + if (runtime != null) + { + runtime.Dispose(); + } + } + } + } + + /// <summary> + /// Add a context to the stack. + /// </summary> + /// <param name="addContextProto"></param> + private void AddContext(AddContextProto addContextProto) + { + lock (_contextStack) + { + ContextRuntime currentTopContext = _contextStack.Peek(); + if (!currentTopContext.Id.Equals(addContextProto.parent_context_id, StringComparison.OrdinalIgnoreCase)) + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to instantiate a child context on context with id '{0}' while the current top context id is {1}", + addContextProto.parent_context_id, + currentTopContext.Id)); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + string contextConfigString = addContextProto.context_configuration; + ContextConfiguration contextConfiguration = new ContextConfiguration(contextConfigString); + ContextRuntime newTopContext; + if (addContextProto.service_configuration != null) + { + ServiceConfiguration serviceConfiguration = new ServiceConfiguration(addContextProto.service_configuration); + newTopContext = currentTopContext.SpawnChildContext(contextConfiguration, serviceConfiguration.TangConfig); + } + else + { + newTopContext = currentTopContext.SpawnChildContext(contextConfiguration); + } + _contextStack.Push(newTopContext); + } + } + + /// <summary> + /// Remove the context with the given ID from the stack. + /// </summary> + /// <param name="contextId"> context id</param> + private void RemoveContext(string contextId) + { + lock (_contextStack) + { + string currentTopContextId = _contextStack.Peek().Id; + if (!contextId.Equals(_contextStack.Peek().Id, StringComparison.OrdinalIgnoreCase)) + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to close context with id '{0}' while the top context id is {1}", contextId, currentTopContextId)); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + _contextStack.Peek().Dispose(); + if (_contextStack.Count > 1) + { + // We did not close the root context. Therefore, we need to inform the + // driver explicitly that this context is closed. The root context notification + // is implicit in the Evaluator close/done notification. + _heartBeatManager.OnNext(); // Ensure Driver gets notified of context DONE state + } + _contextStack.Pop(); + } + // System.gc(); // TODO: garbage collect? + } + + /// <summary> + /// Launch an Task. + /// </summary> + /// <param name="startTaskProto"></param> + private void StartTask(StartTaskProto startTaskProto) + { + lock (_contextStack) + { + ContextRuntime currentActiveContext = _contextStack.Peek(); + string expectedContextId = startTaskProto.context_id; + if (!expectedContextId.Equals(currentActiveContext.Id, StringComparison.OrdinalIgnoreCase)) + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task expected context '{0}' but the active context has Id '{1}'", expectedContextId, currentActiveContext.Id)); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + TaskConfiguration taskConfiguration = new TaskConfiguration(startTaskProto.configuration); + currentActiveContext.StartTask(taskConfiguration, expectedContextId, _heartBeatManager); + } + } + + /// <summary> + /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager + /// </summary> + /// <param name="e"></param> + private void HandleTaskException(TaskClientCodeException e) + { + LOGGER.Log(Level.Error, "TaskClientCodeException", e); + byte[] exception = ByteUtilities.StringToByteArrays(e.ToString()); + TaskStatusProto taskStatus = new TaskStatusProto() + { + context_id = e.ContextId, + task_id = e.TaskId, + result = exception, + state = State.FAILED + }; + LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeatb for a failed task: {0}", taskStatus.ToString())); + _heartBeatManager.OnNext(taskStatus); + } + + /// <summary> + /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager + /// </summary> + /// <param name="e"></param> + private void HandlContextException(ContextClientCodeException e) + { + LOGGER.Log(Level.Error, "ContextClientCodeException", e); + byte[] exception = ByteUtilities.StringToByteArrays(e.ToString()); + ContextStatusProto contextStatusProto = new ContextStatusProto() + { + context_id = e.ContextId, + context_state = ContextStatusProto.State.FAIL, + error = exception + }; + if (e.ParentId.IsPresent()) + { + contextStatusProto.parent_id = e.ParentId.Value; + } + LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeat for a failed context: {0}", contextStatusProto.ToString())); + _heartBeatManager.OnNext(contextStatusProto); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs new file mode 100644 index 0000000..9ed7a5c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Evaluator.Context; +using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.REEF.Common.Task; +using Org.Apache.REEF.Tasks; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Interface; +using System; +using System.Globalization; + +namespace Org.Apache.REEF.Common.Context +{ + public class ContextRuntime + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextRuntime)); + // Context-local injector. This contains information that will not be available in child injectors. + private readonly IInjector _contextInjector; + //// Service injector. State in this injector moves to child injectors. + private readonly IInjector _serviceInjector; + + // Convenience class to hold all the event handlers for the context as well as the service instances. + private readonly ContextLifeCycle _contextLifeCycle; + + // The child context, if any. + private Optional<ContextRuntime> _childContext = Optional<ContextRuntime>.Empty(); + + // The parent context, if any. + private Optional<ContextRuntime> _parentContext = Optional<ContextRuntime>.Empty(); + + // The currently running task, if any. + private Optional<TaskRuntime> _task = Optional<TaskRuntime>.Empty(); + + private ContextStatusProto.State _contextState = ContextStatusProto.State.READY; + + /// <summary> + /// Create a new ContextRuntime. + /// </summary> + /// <param name="serviceInjector"></param> + /// <param name="contextConfiguration">the Configuration for this context.</param> + /// <param name="parentContext"></param> + public ContextRuntime( + IInjector serviceInjector, + IConfiguration contextConfiguration, + Optional<ContextRuntime> parentContext) + { + ContextConfiguration config = contextConfiguration as ContextConfiguration; + if (config == null) + { + var e = new ArgumentException("contextConfiguration is not of type ContextConfiguration"); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + _contextLifeCycle = new ContextLifeCycle(config.Id); + _serviceInjector = serviceInjector; + _parentContext = parentContext; + try + { + _contextInjector = serviceInjector.ForkInjector(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + + Optional<string> parentId = ParentContext.IsPresent() ? + Optional<string>.Of(ParentContext.Value.Id) : + Optional<string>.Empty(); + ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e); + + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); + } + // Trigger the context start events on contextInjector. + _contextLifeCycle.Start(); + } + + /// <summary> + /// Create a new ContextRuntime for the root context. + /// </summary> + /// <param name="serviceInjector"> </param> the serviceInjector to be used. + /// <param name="contextConfiguration"> the Configuration for this context.</param> + public ContextRuntime( + IInjector serviceInjector, + IConfiguration contextConfiguration) + : this(serviceInjector, contextConfiguration, Optional<ContextRuntime>.Empty()) + { + LOGGER.Log(Level.Info, "Instantiating root context"); + } + + public string Id + { + get { return _contextLifeCycle.Id; } + } + + public Optional<ContextRuntime> ParentContext + { + get { return _parentContext; } + } + + /// <summary> + /// Spawns a new context. + /// The new context will have a serviceInjector that is created by forking the one in this object with the given + /// serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector. + /// </summary> + /// <param name="contextConfiguration">the new context's context (local) Configuration.</param> + /// <param name="serviceConfiguration">the new context's service Configuration.</param> + /// <returns>a child context.</returns> + public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration, IConfiguration serviceConfiguration) + { + ContextRuntime childContext = null; + lock (_contextLifeCycle) + { + if (_task.IsPresent()) + { + var e = new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + if (_childContext.IsPresent()) + { + var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + try + { + IInjector childServiceInjector = _serviceInjector.ForkInjector(new IConfiguration[] { serviceConfiguration }); + childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this)); + _childContext = Optional<ContextRuntime>.Of(childContext); + return childContext; + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + + Optional<string> parentId = ParentContext.IsPresent() ? + Optional<string>.Of(ParentContext.Value.Id) : + Optional<string>.Empty(); + ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e); + + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); + } + } + return childContext; + } + + /// <summary> + /// Spawns a new context without services of its own. + /// The new context will have a serviceInjector that is created by forking the one in this object. The + /// contextConfiguration is used to fork the contextInjector from that new serviceInjector. + /// </summary> + /// <param name="contextConfiguration">the new context's context (local) Configuration.</param> + /// <returns> a child context.</returns> + public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration) + { + lock (_contextLifeCycle) + { + if (_task.IsPresent()) + { + var e = new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + if (_childContext.IsPresent()) + { + var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + IInjector childServiceInjector = _serviceInjector.ForkInjector(); + ContextRuntime childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this)); + _childContext = Optional<ContextRuntime>.Of(childContext); + return childContext; + } + } + + /// <summary> + /// Launches an Task on this context. + /// </summary> + /// <param name="taskConfiguration"></param> + /// <param name="contextId"></param> + /// <param name="heartBeatManager"></param> + public void StartTask(TaskConfiguration taskConfiguration, string contextId, HeartBeatManager heartBeatManager) + { + lock (_contextLifeCycle) + { + bool taskPresent = _task.IsPresent(); + bool taskEnded = taskPresent && _task.Value.HasEnded(); + + LOGGER.Log(Level.Info, "ContextRuntime::StartTask(TaskConfiguration)" + "task is present: " + taskPresent + " task has ended: " + taskEnded); + if (taskPresent) + { + LOGGER.Log(Level.Info, "Task state: " + _task.Value.GetTaskState()); + } + + if (taskEnded) + { + // clean up state + _task = Optional<TaskRuntime>.Empty(); + taskPresent = false; + } + if (taskPresent) + { + var e = new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + if (_childContext.IsPresent()) + { + var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + try + { + IInjector taskInjector = _contextInjector.ForkInjector(new IConfiguration[] { taskConfiguration.TangConfig }); + LOGGER.Log(Level.Info, "Trying to inject task with configuration" + taskConfiguration.ToString()); + TaskRuntime taskRuntime = new TaskRuntime(taskInjector, contextId, taskConfiguration.TaskId, heartBeatManager); // taskInjector.getInstance(TaskRuntime.class); + taskRuntime.Initialize(); + System.Threading.Tasks.Task.Run(new Action(taskRuntime.Start)); + _task = Optional<TaskRuntime>.Of(taskRuntime); + } + catch (Exception e) + { + var ex = new TaskClientCodeException(taskConfiguration.TaskId, Id, "Unable to instantiate the new task", e); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Task start error.", LOGGER); + } + } + } + + /// <summary> + /// Close this context. If there is a child context, this recursively closes it before closing this context. If + /// there is an Task currently running, that will be closed. + /// </summary> + public void Dispose() + { + lock (_contextLifeCycle) + { + _contextState = ContextStatusProto.State.DONE; + if (_task.IsPresent()) + { + LOGGER.Log(Level.Warning, "Shutting down an task because the underlying context is being closed."); + _task.Value.Close(null); + } + if (_childContext.IsPresent()) + { + LOGGER.Log(Level.Warning, "Closing a context because its parent context is being closed."); + _childContext.Value.Dispose(); + } + _contextLifeCycle.Close(); + if (_parentContext.IsPresent()) + { + ParentContext.Value.ResetChildContext(); + } + } + } + + /// <summary> + /// Issue a suspend call to the Task + /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING + /// in the log. + /// </summary> + /// <param name="message"> the suspend message to deliver or null if there is none.</param> + public void SuspendTask(byte[] message) + { + lock (_contextLifeCycle) + { + if (!_task.IsPresent()) + { + LOGGER.Log(Level.Warning, "Received a suspend task while there was no task running. Ignored"); + } + else + { + _task.Value.Suspend(message); + } + } + } + + /// <summary> + /// Issue a close call to the Task + /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING + /// in the log. + /// </summary> + /// <param name="message">the close message to deliver or null if there is none.</param> + public void CloseTask(byte[] message) + { + lock (_contextLifeCycle) + { + if (!_task.IsPresent()) + { + LOGGER.Log(Level.Warning, "Received a close task while there was no task running. Ignored"); + } + else + { + _task.Value.Close(message); + } + } + } + + /// <summary> + /// Deliver a message to the Task + /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING + /// in the log. + /// </summary> + /// <param name="message">the message to deliver or null if there is none.</param> + public void DeliverTaskMessage(byte[] message) + { + lock (_contextLifeCycle) + { + if (!_task.IsPresent()) + { + LOGGER.Log(Level.Warning, "Received an task message while there was no task running. Ignored"); + } + else + { + _task.Value.Deliver(message); + } + } + } + + public void HandleContextMessaage(byte[] mesage) + { + _contextLifeCycle.HandleContextMessage(mesage); + } + + /// <summary> + /// get state of the running Task + /// </summary> + /// <returns> the state of the running Task, if one is running.</returns> + public Optional<TaskStatusProto> GetTaskStatus() + { + lock (_contextLifeCycle) + { + if (_task.IsPresent()) + { + if (_task.Value.HasEnded()) + { + _task = Optional<TaskRuntime>.Empty(); + return Optional<TaskStatusProto>.Empty(); + } + else + { + TaskStatusProto taskStatusProto = _task.Value.GetStatusProto(); + if (taskStatusProto.state == State.RUNNING) + { + // only RUNNING status is allowed to rurn here, all other state pushed out to heartbeat + return Optional<TaskStatusProto>.Of(taskStatusProto); + } + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task state must be RUNNING, but instead is in {0} state", taskStatusProto.state)); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + return Optional<TaskStatusProto>.Empty(); + } + } + else + { + return Optional<TaskStatusProto>.Empty(); + } + } + } + + /// <summary> + /// Reset child context when parent is being closed + /// </summary> + public void ResetChildContext() + { + lock (_contextLifeCycle) + { + if (_childContext.IsPresent()) + { + _childContext = Optional<ContextRuntime>.Empty(); + } + else + { + var e = new InvalidOperationException("no child context set"); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + } + } + + /// <summary> + /// get context's status in protocol buffer + /// </summary> + /// <returns>this context's status in protocol buffer form.</returns> + public ContextStatusProto GetContextStatus() + { + lock (_contextLifeCycle) + { + ContextStatusProto contextStatusProto = new ContextStatusProto() + { + context_id = Id, + context_state = _contextState, + }; + if (_parentContext.IsPresent()) + { + contextStatusProto.parent_id = _parentContext.Value.Id; + } + + foreach (IContextMessageSource source in _contextLifeCycle.ContextMessageSources) + { + Optional<ContextMessage> contextMessageOptional = source.Message; + if (contextMessageOptional.IsPresent()) + { + ContextStatusProto.ContextMessageProto contextMessageProto + = new ContextStatusProto.ContextMessageProto() + { + source_id = contextMessageOptional.Value.MessageSourceId, + }; + contextMessageProto.message = ByteUtilities.CopyBytesFrom(contextMessageOptional.Value.Bytes); + contextStatusProto.context_message.Add(contextMessageProto); + } + } + return contextStatusProto; + } + } + } +} + ///// <summary> + ///// TODO: remove and use parameterless GetContextStatus above + ///// </summary> + ///// <returns>this context's status in protocol buffer form.</returns> + //public ContextStatusProto GetContextStatus(string contextId) + //{ + // ContextStatusProto contextStatusProto = new ContextStatusProto() + // { + // context_id = contextId, + // context_state = _contextState, + // }; + // return contextStatusProto; + //} + + ////// TODO: remove and use injection + //public void StartTask(ITask task, HeartBeatManager heartBeatManager, string taskId, string contextId) + //{ + // lock (_contextLifeCycle) + // { + // if (_task.IsPresent() && _task.Value.HasEnded()) + // { + // // clean up state + // _task = Optional<TaskRuntime>.Empty(); + // } + // if (_task.IsPresent()) + // { + // throw new InvalidOperationException( + // string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here + // } + // if (_childContext.IsPresent()) + // { + // throw new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); + // } + // try + // { + // // final Injector taskInjector = contextInjector.forkInjector(taskConfiguration); + // TaskRuntime taskRuntime // taskInjector.getInstance(TaskRuntime.class); + // = new TaskRuntime(task, heartBeatManager); + // LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Starting task '{0}'", taskId)); + // taskRuntime.Initialize(taskId, contextId); + // taskRuntime.Start(); + // _task = Optional<TaskRuntime>.Of(taskRuntime); + // } + // catch (Exception e) + // { + // throw new InvalidOperationException("Unable to instantiate the new task"); + // } + // } + //} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs new file mode 100644 index 0000000..7c62a0b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Events; + +namespace Org.Apache.REEF.Common.Context +{ + class ContextStartImpl : IContextStart + { + public ContextStartImpl(string id) + { + Id = id; + } + + public string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs new file mode 100644 index 0000000..4df40b6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Events; + +namespace Org.Apache.REEF.Common.Context +{ + class ContextStopImpl : IContextStop + { + public ContextStopImpl(string id) + { + Id = id; + } + + public string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs new file mode 100644 index 0000000..e7daecb --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.Evaluator.Context; +using Org.Apache.REEF.Services; +using Org.Apache.REEF.Tasks; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Implementations; +using Org.Apache.REEF.Tang.Interface; +using System; +using System.Globalization; +using Org.Apache.REEF.Tang.Implementations.Tang; + +namespace Org.Apache.REEF.Common.Context +{ + /// <summary> + /// Helper class that encapsulates the root context configuration: With or without services and an initial task. + /// </summary> + public sealed class RootContextLauncher + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(RootContextLauncher)); + + private readonly IInjector _rootServiceInjector = null; + + private ContextRuntime _rootContext = null; + + private ContextConfiguration _rootContextConfiguration = null; + + public RootContextLauncher(ContextConfiguration rootContextConfig, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig) + { + _rootContextConfiguration = rootContextConfig; + _rootServiceInjector = InjectServices(rootServiceConfig); + RootTaskConfig = rootTaskConfig; + } + + public Optional<TaskConfiguration> RootTaskConfig { get; set; } + + public ContextConfiguration RootContextConfig + { + get { return _rootContextConfiguration; } + set { _rootContextConfiguration = value; } + } + + public ContextRuntime GetRootContext() + { + if (_rootContext == null) + { + _rootContext = GetRootContext(_rootServiceInjector, _rootContextConfiguration); + } + return _rootContext; + } + + private IInjector InjectServices(Optional<ServiceConfiguration> serviceConfig) + { + IInjector rootServiceInjector; + + if (serviceConfig.IsPresent()) + { + rootServiceInjector = TangFactory.GetTang().NewInjector(serviceConfig.Value.TangConfig); + InjectedServices services = null; + try + { + services = rootServiceInjector.GetInstance<InjectedServices>(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Failed to instantiate service.", LOGGER); + InvalidOperationException ex = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Failed to inject service: encoutned error {1} with message [{0}] and stack trace:[{1}]", e, e.Message, e.StackTrace)); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); + } + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "injected {0} service(s)", services.Services.Count)); + } + else + { + rootServiceInjector = TangFactory.GetTang().NewInjector(); + LOGGER.Log(Level.Info, "no service provided for injection."); + } + + return rootServiceInjector; + } + + private ContextRuntime GetRootContext( + IInjector rootServiceInjector, + IConfiguration rootContextConfiguration) + { + ContextRuntime result; + result = new ContextRuntime(rootServiceInjector, rootContextConfiguration); + return result; + } + } +} +//if (rootServiceInjector != null) +//{ +// try +// { +// rootServiceInjector = rootServiceInjector.ForkInjector(serviceConfigs); +// } +// catch (Exception e) +// { +// throw new ContextClientCodeException(ContextClientCodeException.GetId(rootContextConfiguration), +// Optional<String>.Empty(), +// "Unable to instatiate the root context", e); +// } +// result = new ContextRuntime(rootServiceInjector, rootContextConfiguration); +//} +//else +//{ +// result = new ContextRuntime(rootServiceInjector.ForkInjector(), rootContextConfiguration); +//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs new file mode 100644 index 0000000..fc50c73 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tasks.Events; +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Common +{ + public class CloseEventImpl : ICloseEvent + { + public CloseEventImpl() + { + Value = Optional<byte[]>.Empty(); + } + + public CloseEventImpl(byte[] bytes) + { + Value = Optional<byte[]>.OfNullable(bytes); + } + + public Optional<byte[]> Value + { + get { return Value; } + set { value = Value; } + } + + public override string ToString() + { + return "CloseEvent{value=" + Value.ToString() + "}"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs new file mode 100644 index 0000000..2b00aa2 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tasks.Events; +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator +{ + public class DriverMessageImpl : IDriverMessage + { + private Optional<byte[]> _value; + + public DriverMessageImpl() + { + _value = Optional<byte[]>.Empty(); + } + + public DriverMessageImpl(byte[] bytes) + { + _value = Optional<byte[]>.OfNullable(bytes); + } + + public Optional<byte[]> Message + { + get + { + return _value; + } + } + + public override string ToString() + { + return "DriverMessage [value=" + ByteUtilities.ByteArrarysToString(_value.Value) + "]"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs new file mode 100644 index 0000000..a6bb52f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tasks.Events; +using Org.Apache.REEF.Utilities; + +namespace Org.Apache.REEF.Common +{ + public class SuspendEventImpl : ICloseEvent + { + public SuspendEventImpl() + { + Value = Optional<byte[]>.Empty(); + } + + public SuspendEventImpl(byte[] bytes) + { + Value = Optional<byte[]>.OfNullable(bytes); + } + + public Optional<byte[]> Value + { + get { return Value; } + set { value = Value; } + } + + public override string ToString() + { + return "SuspendEvent{value=" + Value.ToString() + "}"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs new file mode 100644 index 0000000..22bdbd3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using Org.Apache.REEF.Tang.Interface; + +namespace Org.Apache.REEF.Common.Task +{ + public class TaskClientCodeException : Exception + { + private readonly string _taskId; + + private readonly string _contextId; + + /// <summary> + /// construct the exception that caused the Task to fail + /// </summary> + /// <param name="taskId"> the id of the failed task.</param> + /// <param name="contextId"> the id of the context the failed Task was executing in.</param> + /// <param name="message"> the error message </param> + /// <param name="cause"> the exception that caused the Task to fail.</param> + public TaskClientCodeException( + string taskId, + string contextId, + string message, + Exception cause) + : base(message, cause) + { + _taskId = taskId; + _contextId = contextId; + } + + public string TaskId + { + get { return _taskId; } + } + + public string ContextId + { + get { return _contextId; } + } + + public static string GetTaskIdentifier(IConfiguration c) + { + // TODO: update after TANG is available + return string.Empty; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs new file mode 100644 index 0000000..26e638d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tasks.Events; +using Org.Apache.REEF.Wake; +using System; +using System.Collections.Generic; + +namespace Org.Apache.REEF.Common +{ + public class TaskLifeCycle + { + private readonly HashSet<IObserver<ITaskStop>> _taskStopHandlers; + private readonly HashSet<IObserver<ITaskStart>> _taskStartHandlers; + private readonly ITaskStart _taskStart; + private readonly ITaskStop _taskStop; + + // INJECT + public TaskLifeCycle( + HashSet<IObserver<ITaskStop>> taskStopHandlers, + HashSet<IObserver<ITaskStart>> taskStartHandlers, + TaskStartImpl taskStart, + TaskStopImpl taskStop) + { + _taskStartHandlers = taskStartHandlers; + _taskStopHandlers = taskStopHandlers; + _taskStart = taskStart; + _taskStop = taskStop; + } + + public TaskLifeCycle() + { + _taskStartHandlers = new HashSet<IObserver<ITaskStart>>(); + _taskStopHandlers = new HashSet<IObserver<ITaskStop>>(); + } + + public void Start() + { + foreach (IObserver<ITaskStart> startHandler in _taskStartHandlers) + { + startHandler.OnNext(_taskStart); + } + } + + public void Stop() + { + foreach (IObserver<ITaskStop> stopHandler in _taskStopHandlers) + { + stopHandler.OnNext(_taskStop); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs new file mode 100644 index 0000000..d531df7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Common.io; +using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.REEF.Common.Runtime.Evaluator; +using Org.Apache.REEF.Common.Task; +using Org.Apache.REEF.Tasks; +using Org.Apache.REEF.Tasks.Events; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Tang.Interface; +using System; +using System.Collections.Generic; +using System.Globalization; + +namespace Org.Apache.REEF.Common +{ + public class TaskRuntime : IObserver<ICloseEvent>, IObserver<ISuspendEvent>, IObserver<IDriverMessage> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskRuntime)); + + private readonly ITask _task; + + private readonly IInjector _injector; + + // The memento given by the task configuration + private readonly Optional<byte[]> _memento; + + private readonly HeartBeatManager _heartBeatManager; + + private readonly TaskStatus _currentStatus; + + private readonly INameClient _nameClient; + + public TaskRuntime(IInjector taskInjector, string contextId, string taskId, HeartBeatManager heartBeatManager, string memento = null) + { + _injector = taskInjector; + _heartBeatManager = heartBeatManager; + + Optional<ISet<ITaskMessageSource>> messageSources = Optional<ISet<ITaskMessageSource>>.Empty(); + try + { + _task = _injector.GetInstance<ITask>(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to inject task.", LOGGER); + } + try + { + ITaskMessageSource taskMessageSource = _injector.GetInstance<ITaskMessageSource>(); + messageSources = Optional<ISet<ITaskMessageSource>>.Of(new HashSet<ITaskMessageSource>() { taskMessageSource }); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot inject task message source with error: " + e.StackTrace, LOGGER); + // do not rethrow since this is benign + } + try + { + _nameClient = _injector.GetInstance<INameClient>(); + _heartBeatManager.EvaluatorSettings.NameClient = _nameClient; + } + catch (InjectionException) + { + LOGGER.Log(Level.Warning, "Cannot inject name client from task configuration."); + // do not rethrow since user is not required to provide name client + } + + LOGGER.Log(Level.Info, "task message source injected"); + _currentStatus = new TaskStatus(_heartBeatManager, contextId, taskId, messageSources); + _memento = memento == null ? + Optional<byte[]>.Empty() : Optional<byte[]>.Of(ByteUtilities.StringToByteArrays(memento)); + } + + public string TaskId + { + get { return _currentStatus.TaskId; } + } + + public string ContextId + { + get { return _currentStatus.ContextId; } + } + + public void Initialize() + { + _currentStatus.SetRunning(); + } + + /// <summary> + /// Run the task + /// </summary> + public void Start() + { + try + { + LOGGER.Log(Level.Info, "Call Task"); + if (_currentStatus.IsNotRunning()) + { + var e = new InvalidOperationException("TaskRuntime not in Running state, instead it is in state " + _currentStatus.State); + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + byte[] result; + byte[] taskMemento = _memento.IsPresent() ? _memento.Value : null; + System.Threading.Tasks.Task<byte[]> runTask = new System.Threading.Tasks.Task<byte[]>(() => RunTask(taskMemento)); + try + { + runTask.Start(); + runTask.Wait(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Exception thrown during task running.", LOGGER); + } + result = runTask.Result; + + LOGGER.Log(Level.Info, "Task Call Finished"); + if (_task != null) + { + _task.Dispose(); + } + _currentStatus.SetResult(result); + if (result != null && result.Length > 0) + { + LOGGER.Log(Level.Info, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); + } + } + catch (Exception e) + { + if (_task != null) + { + _task.Dispose(); + } + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Task failed caused by exception [{0}]", e)); + _currentStatus.SetException(e); + } + } + + public TaskState GetTaskState() + { + return _currentStatus.State; + } + + /// <summary> + /// Called by heartbeat manager + /// </summary> + /// <returns> current TaskStatusProto </returns> + public TaskStatusProto GetStatusProto() + { + return _currentStatus.ToProto(); + } + + public bool HasEnded() + { + return _currentStatus.HasEnded(); + } + + /// <summary> + /// get ID of the task. + /// </summary> + /// <returns>ID of the task.</returns> + public string GetActicityId() + { + return _currentStatus.TaskId; + } + + public void Close(byte[] message) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId)); + if (_currentStatus.IsNotRunning()) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State)); + } + else + { + try + { + OnNext(new CloseEventImpl(message)); + _currentStatus.SetCloseRequested(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", LOGGER); + + _currentStatus.SetException( + new TaskClientCodeException(TaskId, ContextId, "Error during Close().", e)); + } + } + } + + public void Suspend(byte[] message) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to suspend Task {0}", TaskId)); + + if (_currentStatus.IsNotRunning()) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is in {0} state. Ignored.", _currentStatus.State)); + } + else + { + try + { + OnNext(new SuspendEventImpl(message)); + _currentStatus.SetSuspendRequested(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Suspend.", LOGGER); + _currentStatus.SetException( + new TaskClientCodeException(TaskId, ContextId, "Error during Suspend().", e)); + } + } + } + + public void Deliver(byte[] message) + { + if (_currentStatus.IsNotRunning()) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an task that is in {0} state. Ignored.", _currentStatus.State)); + } + else + { + try + { + OnNext(new DriverMessageImpl(message)); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during message delivery.", LOGGER); + _currentStatus.SetException( + new TaskClientCodeException(TaskId, ContextId, "Error during message delivery.", e)); + } + } + } + + public void OnNext(ICloseEvent value) + { + LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)"); + // TODO: send a heartbeat + } + + void IObserver<ICloseEvent>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<IDriverMessage>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<IDriverMessage>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<ISuspendEvent>.OnCompleted() + { + throw new NotImplementedException(); + } + + void IObserver<ISuspendEvent>.OnError(Exception error) + { + throw new NotImplementedException(); + } + + void IObserver<ICloseEvent>.OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnNext(ISuspendEvent value) + { + LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)"); + // TODO: send a heartbeat + } + + public void OnNext(IDriverMessage value) + { + IDriverMessageHandler messageHandler = null; + LOGGER.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage value)"); + try + { + messageHandler = _injector.GetInstance<IDriverMessageHandler>(); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Received Driver message, but unable to inject handler for driver message ", LOGGER); + } + if (messageHandler != null) + { + try + { + messageHandler.Handle(value); + } + catch (Exception e) + { + Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Exception throw when handling driver message: " + e, LOGGER); + _currentStatus.RecordExecptionWithoutHeartbeat(e); + } + } + } + + private byte[] RunTask(byte[] memento) + { + return _task.Call(memento); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs new file mode 100644 index 0000000..ad8002b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tasks.Events; + +namespace Org.Apache.REEF.Common +{ + public class TaskStartImpl : ITaskStart + { + //INJECT + public TaskStartImpl(string id) + { + Id = id; + } + + public string Id { get; set; } + } +}
