http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs deleted file mode 100644 index 9ed7a5c..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextRuntime.cs +++ /dev/null @@ -1,478 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Evaluator.Context; -using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; -using Org.Apache.REEF.Common.Task; -using Org.Apache.REEF.Tasks; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Interface; -using System; -using System.Globalization; - -namespace Org.Apache.REEF.Common.Context -{ - public class ContextRuntime - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextRuntime)); - // Context-local injector. This contains information that will not be available in child injectors. - private readonly IInjector _contextInjector; - //// Service injector. State in this injector moves to child injectors. - private readonly IInjector _serviceInjector; - - // Convenience class to hold all the event handlers for the context as well as the service instances. - private readonly ContextLifeCycle _contextLifeCycle; - - // The child context, if any. - private Optional<ContextRuntime> _childContext = Optional<ContextRuntime>.Empty(); - - // The parent context, if any. - private Optional<ContextRuntime> _parentContext = Optional<ContextRuntime>.Empty(); - - // The currently running task, if any. - private Optional<TaskRuntime> _task = Optional<TaskRuntime>.Empty(); - - private ContextStatusProto.State _contextState = ContextStatusProto.State.READY; - - /// <summary> - /// Create a new ContextRuntime. - /// </summary> - /// <param name="serviceInjector"></param> - /// <param name="contextConfiguration">the Configuration for this context.</param> - /// <param name="parentContext"></param> - public ContextRuntime( - IInjector serviceInjector, - IConfiguration contextConfiguration, - Optional<ContextRuntime> parentContext) - { - ContextConfiguration config = contextConfiguration as ContextConfiguration; - if (config == null) - { - var e = new ArgumentException("contextConfiguration is not of type ContextConfiguration"); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - _contextLifeCycle = new ContextLifeCycle(config.Id); - _serviceInjector = serviceInjector; - _parentContext = parentContext; - try - { - _contextInjector = serviceInjector.ForkInjector(); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); - - Optional<string> parentId = ParentContext.IsPresent() ? - Optional<string>.Of(ParentContext.Value.Id) : - Optional<string>.Empty(); - ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e); - - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); - } - // Trigger the context start events on contextInjector. - _contextLifeCycle.Start(); - } - - /// <summary> - /// Create a new ContextRuntime for the root context. - /// </summary> - /// <param name="serviceInjector"> </param> the serviceInjector to be used. - /// <param name="contextConfiguration"> the Configuration for this context.</param> - public ContextRuntime( - IInjector serviceInjector, - IConfiguration contextConfiguration) - : this(serviceInjector, contextConfiguration, Optional<ContextRuntime>.Empty()) - { - LOGGER.Log(Level.Info, "Instantiating root context"); - } - - public string Id - { - get { return _contextLifeCycle.Id; } - } - - public Optional<ContextRuntime> ParentContext - { - get { return _parentContext; } - } - - /// <summary> - /// Spawns a new context. - /// The new context will have a serviceInjector that is created by forking the one in this object with the given - /// serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector. - /// </summary> - /// <param name="contextConfiguration">the new context's context (local) Configuration.</param> - /// <param name="serviceConfiguration">the new context's service Configuration.</param> - /// <returns>a child context.</returns> - public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration, IConfiguration serviceConfiguration) - { - ContextRuntime childContext = null; - lock (_contextLifeCycle) - { - if (_task.IsPresent()) - { - var e = new InvalidOperationException( - string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - if (_childContext.IsPresent()) - { - var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - try - { - IInjector childServiceInjector = _serviceInjector.ForkInjector(new IConfiguration[] { serviceConfiguration }); - childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this)); - _childContext = Optional<ContextRuntime>.Of(childContext); - return childContext; - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); - - Optional<string> parentId = ParentContext.IsPresent() ? - Optional<string>.Of(ParentContext.Value.Id) : - Optional<string>.Empty(); - ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e); - - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); - } - } - return childContext; - } - - /// <summary> - /// Spawns a new context without services of its own. - /// The new context will have a serviceInjector that is created by forking the one in this object. The - /// contextConfiguration is used to fork the contextInjector from that new serviceInjector. - /// </summary> - /// <param name="contextConfiguration">the new context's context (local) Configuration.</param> - /// <returns> a child context.</returns> - public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration) - { - lock (_contextLifeCycle) - { - if (_task.IsPresent()) - { - var e = new InvalidOperationException( - string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - if (_childContext.IsPresent()) - { - var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - IInjector childServiceInjector = _serviceInjector.ForkInjector(); - ContextRuntime childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this)); - _childContext = Optional<ContextRuntime>.Of(childContext); - return childContext; - } - } - - /// <summary> - /// Launches an Task on this context. - /// </summary> - /// <param name="taskConfiguration"></param> - /// <param name="contextId"></param> - /// <param name="heartBeatManager"></param> - public void StartTask(TaskConfiguration taskConfiguration, string contextId, HeartBeatManager heartBeatManager) - { - lock (_contextLifeCycle) - { - bool taskPresent = _task.IsPresent(); - bool taskEnded = taskPresent && _task.Value.HasEnded(); - - LOGGER.Log(Level.Info, "ContextRuntime::StartTask(TaskConfiguration)" + "task is present: " + taskPresent + " task has ended: " + taskEnded); - if (taskPresent) - { - LOGGER.Log(Level.Info, "Task state: " + _task.Value.GetTaskState()); - } - - if (taskEnded) - { - // clean up state - _task = Optional<TaskRuntime>.Empty(); - taskPresent = false; - } - if (taskPresent) - { - var e = new InvalidOperationException( - string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - if (_childContext.IsPresent()) - { - var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - try - { - IInjector taskInjector = _contextInjector.ForkInjector(new IConfiguration[] { taskConfiguration.TangConfig }); - LOGGER.Log(Level.Info, "Trying to inject task with configuration" + taskConfiguration.ToString()); - TaskRuntime taskRuntime = new TaskRuntime(taskInjector, contextId, taskConfiguration.TaskId, heartBeatManager); // taskInjector.getInstance(TaskRuntime.class); - taskRuntime.Initialize(); - System.Threading.Tasks.Task.Run(new Action(taskRuntime.Start)); - _task = Optional<TaskRuntime>.Of(taskRuntime); - } - catch (Exception e) - { - var ex = new TaskClientCodeException(taskConfiguration.TaskId, Id, "Unable to instantiate the new task", e); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Task start error.", LOGGER); - } - } - } - - /// <summary> - /// Close this context. If there is a child context, this recursively closes it before closing this context. If - /// there is an Task currently running, that will be closed. - /// </summary> - public void Dispose() - { - lock (_contextLifeCycle) - { - _contextState = ContextStatusProto.State.DONE; - if (_task.IsPresent()) - { - LOGGER.Log(Level.Warning, "Shutting down an task because the underlying context is being closed."); - _task.Value.Close(null); - } - if (_childContext.IsPresent()) - { - LOGGER.Log(Level.Warning, "Closing a context because its parent context is being closed."); - _childContext.Value.Dispose(); - } - _contextLifeCycle.Close(); - if (_parentContext.IsPresent()) - { - ParentContext.Value.ResetChildContext(); - } - } - } - - /// <summary> - /// Issue a suspend call to the Task - /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING - /// in the log. - /// </summary> - /// <param name="message"> the suspend message to deliver or null if there is none.</param> - public void SuspendTask(byte[] message) - { - lock (_contextLifeCycle) - { - if (!_task.IsPresent()) - { - LOGGER.Log(Level.Warning, "Received a suspend task while there was no task running. Ignored"); - } - else - { - _task.Value.Suspend(message); - } - } - } - - /// <summary> - /// Issue a close call to the Task - /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING - /// in the log. - /// </summary> - /// <param name="message">the close message to deliver or null if there is none.</param> - public void CloseTask(byte[] message) - { - lock (_contextLifeCycle) - { - if (!_task.IsPresent()) - { - LOGGER.Log(Level.Warning, "Received a close task while there was no task running. Ignored"); - } - else - { - _task.Value.Close(message); - } - } - } - - /// <summary> - /// Deliver a message to the Task - /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING - /// in the log. - /// </summary> - /// <param name="message">the message to deliver or null if there is none.</param> - public void DeliverTaskMessage(byte[] message) - { - lock (_contextLifeCycle) - { - if (!_task.IsPresent()) - { - LOGGER.Log(Level.Warning, "Received an task message while there was no task running. Ignored"); - } - else - { - _task.Value.Deliver(message); - } - } - } - - public void HandleContextMessaage(byte[] mesage) - { - _contextLifeCycle.HandleContextMessage(mesage); - } - - /// <summary> - /// get state of the running Task - /// </summary> - /// <returns> the state of the running Task, if one is running.</returns> - public Optional<TaskStatusProto> GetTaskStatus() - { - lock (_contextLifeCycle) - { - if (_task.IsPresent()) - { - if (_task.Value.HasEnded()) - { - _task = Optional<TaskRuntime>.Empty(); - return Optional<TaskStatusProto>.Empty(); - } - else - { - TaskStatusProto taskStatusProto = _task.Value.GetStatusProto(); - if (taskStatusProto.state == State.RUNNING) - { - // only RUNNING status is allowed to rurn here, all other state pushed out to heartbeat - return Optional<TaskStatusProto>.Of(taskStatusProto); - } - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task state must be RUNNING, but instead is in {0} state", taskStatusProto.state)); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - return Optional<TaskStatusProto>.Empty(); - } - } - else - { - return Optional<TaskStatusProto>.Empty(); - } - } - } - - /// <summary> - /// Reset child context when parent is being closed - /// </summary> - public void ResetChildContext() - { - lock (_contextLifeCycle) - { - if (_childContext.IsPresent()) - { - _childContext = Optional<ContextRuntime>.Empty(); - } - else - { - var e = new InvalidOperationException("no child context set"); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - } - } - - /// <summary> - /// get context's status in protocol buffer - /// </summary> - /// <returns>this context's status in protocol buffer form.</returns> - public ContextStatusProto GetContextStatus() - { - lock (_contextLifeCycle) - { - ContextStatusProto contextStatusProto = new ContextStatusProto() - { - context_id = Id, - context_state = _contextState, - }; - if (_parentContext.IsPresent()) - { - contextStatusProto.parent_id = _parentContext.Value.Id; - } - - foreach (IContextMessageSource source in _contextLifeCycle.ContextMessageSources) - { - Optional<ContextMessage> contextMessageOptional = source.Message; - if (contextMessageOptional.IsPresent()) - { - ContextStatusProto.ContextMessageProto contextMessageProto - = new ContextStatusProto.ContextMessageProto() - { - source_id = contextMessageOptional.Value.MessageSourceId, - }; - contextMessageProto.message = ByteUtilities.CopyBytesFrom(contextMessageOptional.Value.Bytes); - contextStatusProto.context_message.Add(contextMessageProto); - } - } - return contextStatusProto; - } - } - } -} - ///// <summary> - ///// TODO: remove and use parameterless GetContextStatus above - ///// </summary> - ///// <returns>this context's status in protocol buffer form.</returns> - //public ContextStatusProto GetContextStatus(string contextId) - //{ - // ContextStatusProto contextStatusProto = new ContextStatusProto() - // { - // context_id = contextId, - // context_state = _contextState, - // }; - // return contextStatusProto; - //} - - ////// TODO: remove and use injection - //public void StartTask(ITask task, HeartBeatManager heartBeatManager, string taskId, string contextId) - //{ - // lock (_contextLifeCycle) - // { - // if (_task.IsPresent() && _task.Value.HasEnded()) - // { - // // clean up state - // _task = Optional<TaskRuntime>.Empty(); - // } - // if (_task.IsPresent()) - // { - // throw new InvalidOperationException( - // string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here - // } - // if (_childContext.IsPresent()) - // { - // throw new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); - // } - // try - // { - // // final Injector taskInjector = contextInjector.forkInjector(taskConfiguration); - // TaskRuntime taskRuntime // taskInjector.getInstance(TaskRuntime.class); - // = new TaskRuntime(task, heartBeatManager); - // LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Starting task '{0}'", taskId)); - // taskRuntime.Initialize(taskId, contextId); - // taskRuntime.Start(); - // _task = Optional<TaskRuntime>.Of(taskRuntime); - // } - // catch (Exception e) - // { - // throw new InvalidOperationException("Unable to instantiate the new task"); - // } - // } - //} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs deleted file mode 100644 index 7c62a0b..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStartImpl.cs +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Events; - -namespace Org.Apache.REEF.Common.Context -{ - class ContextStartImpl : IContextStart - { - public ContextStartImpl(string id) - { - Id = id; - } - - public string Id { get; set; } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs deleted file mode 100644 index 4df40b6..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextStopImpl.cs +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Events; - -namespace Org.Apache.REEF.Common.Context -{ - class ContextStopImpl : IContextStop - { - public ContextStopImpl(string id) - { - Id = id; - } - - public string Id { get; set; } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs deleted file mode 100644 index e7daecb..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/RootContextLauncher.cs +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Evaluator.Context; -using Org.Apache.REEF.Services; -using Org.Apache.REEF.Tasks; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Implementations; -using Org.Apache.REEF.Tang.Interface; -using System; -using System.Globalization; -using Org.Apache.REEF.Tang.Implementations.Tang; - -namespace Org.Apache.REEF.Common.Context -{ - /// <summary> - /// Helper class that encapsulates the root context configuration: With or without services and an initial task. - /// </summary> - public sealed class RootContextLauncher - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(RootContextLauncher)); - - private readonly IInjector _rootServiceInjector = null; - - private ContextRuntime _rootContext = null; - - private ContextConfiguration _rootContextConfiguration = null; - - public RootContextLauncher(ContextConfiguration rootContextConfig, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig) - { - _rootContextConfiguration = rootContextConfig; - _rootServiceInjector = InjectServices(rootServiceConfig); - RootTaskConfig = rootTaskConfig; - } - - public Optional<TaskConfiguration> RootTaskConfig { get; set; } - - public ContextConfiguration RootContextConfig - { - get { return _rootContextConfiguration; } - set { _rootContextConfiguration = value; } - } - - public ContextRuntime GetRootContext() - { - if (_rootContext == null) - { - _rootContext = GetRootContext(_rootServiceInjector, _rootContextConfiguration); - } - return _rootContext; - } - - private IInjector InjectServices(Optional<ServiceConfiguration> serviceConfig) - { - IInjector rootServiceInjector; - - if (serviceConfig.IsPresent()) - { - rootServiceInjector = TangFactory.GetTang().NewInjector(serviceConfig.Value.TangConfig); - InjectedServices services = null; - try - { - services = rootServiceInjector.GetInstance<InjectedServices>(); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Failed to instantiate service.", LOGGER); - InvalidOperationException ex = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Failed to inject service: encoutned error {1} with message [{0}] and stack trace:[{1}]", e, e.Message, e.StackTrace)); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); - } - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "injected {0} service(s)", services.Services.Count)); - } - else - { - rootServiceInjector = TangFactory.GetTang().NewInjector(); - LOGGER.Log(Level.Info, "no service provided for injection."); - } - - return rootServiceInjector; - } - - private ContextRuntime GetRootContext( - IInjector rootServiceInjector, - IConfiguration rootContextConfiguration) - { - ContextRuntime result; - result = new ContextRuntime(rootServiceInjector, rootContextConfiguration); - return result; - } - } -} -//if (rootServiceInjector != null) -//{ -// try -// { -// rootServiceInjector = rootServiceInjector.ForkInjector(serviceConfigs); -// } -// catch (Exception e) -// { -// throw new ContextClientCodeException(ContextClientCodeException.GetId(rootContextConfiguration), -// Optional<String>.Empty(), -// "Unable to instatiate the root context", e); -// } -// result = new ContextRuntime(rootServiceInjector, rootContextConfiguration); -//} -//else -//{ -// result = new ContextRuntime(rootServiceInjector.ForkInjector(), rootContextConfiguration); -//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs deleted file mode 100644 index fc50c73..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/CloseEventImpl.cs +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tasks.Events; -using Org.Apache.REEF.Utilities; - -namespace Org.Apache.REEF.Common -{ - public class CloseEventImpl : ICloseEvent - { - public CloseEventImpl() - { - Value = Optional<byte[]>.Empty(); - } - - public CloseEventImpl(byte[] bytes) - { - Value = Optional<byte[]>.OfNullable(bytes); - } - - public Optional<byte[]> Value - { - get { return Value; } - set { value = Value; } - } - - public override string ToString() - { - return "CloseEvent{value=" + Value.ToString() + "}"; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs deleted file mode 100644 index 2b00aa2..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/DriverMessageImpl.cs +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tasks.Events; -using Org.Apache.REEF.Utilities; - -namespace Org.Apache.REEF.Common.Runtime.Evaluator -{ - public class DriverMessageImpl : IDriverMessage - { - private Optional<byte[]> _value; - - public DriverMessageImpl() - { - _value = Optional<byte[]>.Empty(); - } - - public DriverMessageImpl(byte[] bytes) - { - _value = Optional<byte[]>.OfNullable(bytes); - } - - public Optional<byte[]> Message - { - get - { - return _value; - } - } - - public override string ToString() - { - return "DriverMessage [value=" + ByteUtilities.ByteArrarysToString(_value.Value) + "]"; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs deleted file mode 100644 index a6bb52f..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/SuspendEventImpl.cs +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tasks.Events; -using Org.Apache.REEF.Utilities; - -namespace Org.Apache.REEF.Common -{ - public class SuspendEventImpl : ICloseEvent - { - public SuspendEventImpl() - { - Value = Optional<byte[]>.Empty(); - } - - public SuspendEventImpl(byte[] bytes) - { - Value = Optional<byte[]>.OfNullable(bytes); - } - - public Optional<byte[]> Value - { - get { return Value; } - set { value = Value; } - } - - public override string ToString() - { - return "SuspendEvent{value=" + Value.ToString() + "}"; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs deleted file mode 100644 index 22bdbd3..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskClientCodeException.cs +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using Org.Apache.REEF.Tang.Interface; - -namespace Org.Apache.REEF.Common.Task -{ - public class TaskClientCodeException : Exception - { - private readonly string _taskId; - - private readonly string _contextId; - - /// <summary> - /// construct the exception that caused the Task to fail - /// </summary> - /// <param name="taskId"> the id of the failed task.</param> - /// <param name="contextId"> the id of the context the failed Task was executing in.</param> - /// <param name="message"> the error message </param> - /// <param name="cause"> the exception that caused the Task to fail.</param> - public TaskClientCodeException( - string taskId, - string contextId, - string message, - Exception cause) - : base(message, cause) - { - _taskId = taskId; - _contextId = contextId; - } - - public string TaskId - { - get { return _taskId; } - } - - public string ContextId - { - get { return _contextId; } - } - - public static string GetTaskIdentifier(IConfiguration c) - { - // TODO: update after TANG is available - return string.Empty; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs deleted file mode 100644 index 26e638d..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskLifeCycle.cs +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tasks.Events; -using Org.Apache.REEF.Wake; -using System; -using System.Collections.Generic; - -namespace Org.Apache.REEF.Common -{ - public class TaskLifeCycle - { - private readonly HashSet<IObserver<ITaskStop>> _taskStopHandlers; - private readonly HashSet<IObserver<ITaskStart>> _taskStartHandlers; - private readonly ITaskStart _taskStart; - private readonly ITaskStop _taskStop; - - // INJECT - public TaskLifeCycle( - HashSet<IObserver<ITaskStop>> taskStopHandlers, - HashSet<IObserver<ITaskStart>> taskStartHandlers, - TaskStartImpl taskStart, - TaskStopImpl taskStop) - { - _taskStartHandlers = taskStartHandlers; - _taskStopHandlers = taskStopHandlers; - _taskStart = taskStart; - _taskStop = taskStop; - } - - public TaskLifeCycle() - { - _taskStartHandlers = new HashSet<IObserver<ITaskStart>>(); - _taskStopHandlers = new HashSet<IObserver<ITaskStop>>(); - } - - public void Start() - { - foreach (IObserver<ITaskStart> startHandler in _taskStartHandlers) - { - startHandler.OnNext(_taskStart); - } - } - - public void Stop() - { - foreach (IObserver<ITaskStop> stopHandler in _taskStopHandlers) - { - stopHandler.OnNext(_taskStop); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs deleted file mode 100644 index d531df7..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskRuntime.cs +++ /dev/null @@ -1,328 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.io; -using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; -using Org.Apache.REEF.Common.Runtime.Evaluator; -using Org.Apache.REEF.Common.Task; -using Org.Apache.REEF.Tasks; -using Org.Apache.REEF.Tasks.Events; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Exceptions; -using Org.Apache.REEF.Tang.Interface; -using System; -using System.Collections.Generic; -using System.Globalization; - -namespace Org.Apache.REEF.Common -{ - public class TaskRuntime : IObserver<ICloseEvent>, IObserver<ISuspendEvent>, IObserver<IDriverMessage> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskRuntime)); - - private readonly ITask _task; - - private readonly IInjector _injector; - - // The memento given by the task configuration - private readonly Optional<byte[]> _memento; - - private readonly HeartBeatManager _heartBeatManager; - - private readonly TaskStatus _currentStatus; - - private readonly INameClient _nameClient; - - public TaskRuntime(IInjector taskInjector, string contextId, string taskId, HeartBeatManager heartBeatManager, string memento = null) - { - _injector = taskInjector; - _heartBeatManager = heartBeatManager; - - Optional<ISet<ITaskMessageSource>> messageSources = Optional<ISet<ITaskMessageSource>>.Empty(); - try - { - _task = _injector.GetInstance<ITask>(); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to inject task.", LOGGER); - } - try - { - ITaskMessageSource taskMessageSource = _injector.GetInstance<ITaskMessageSource>(); - messageSources = Optional<ISet<ITaskMessageSource>>.Of(new HashSet<ITaskMessageSource>() { taskMessageSource }); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot inject task message source with error: " + e.StackTrace, LOGGER); - // do not rethrow since this is benign - } - try - { - _nameClient = _injector.GetInstance<INameClient>(); - _heartBeatManager.EvaluatorSettings.NameClient = _nameClient; - } - catch (InjectionException) - { - LOGGER.Log(Level.Warning, "Cannot inject name client from task configuration."); - // do not rethrow since user is not required to provide name client - } - - LOGGER.Log(Level.Info, "task message source injected"); - _currentStatus = new TaskStatus(_heartBeatManager, contextId, taskId, messageSources); - _memento = memento == null ? - Optional<byte[]>.Empty() : Optional<byte[]>.Of(ByteUtilities.StringToByteArrays(memento)); - } - - public string TaskId - { - get { return _currentStatus.TaskId; } - } - - public string ContextId - { - get { return _currentStatus.ContextId; } - } - - public void Initialize() - { - _currentStatus.SetRunning(); - } - - /// <summary> - /// Run the task - /// </summary> - public void Start() - { - try - { - LOGGER.Log(Level.Info, "Call Task"); - if (_currentStatus.IsNotRunning()) - { - var e = new InvalidOperationException("TaskRuntime not in Running state, instead it is in state " + _currentStatus.State); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - byte[] result; - byte[] taskMemento = _memento.IsPresent() ? _memento.Value : null; - System.Threading.Tasks.Task<byte[]> runTask = new System.Threading.Tasks.Task<byte[]>(() => RunTask(taskMemento)); - try - { - runTask.Start(); - runTask.Wait(); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Exception thrown during task running.", LOGGER); - } - result = runTask.Result; - - LOGGER.Log(Level.Info, "Task Call Finished"); - if (_task != null) - { - _task.Dispose(); - } - _currentStatus.SetResult(result); - if (result != null && result.Length > 0) - { - LOGGER.Log(Level.Info, "Task running result:\r\n" + System.Text.Encoding.Default.GetString(result)); - } - } - catch (Exception e) - { - if (_task != null) - { - _task.Dispose(); - } - LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Task failed caused by exception [{0}]", e)); - _currentStatus.SetException(e); - } - } - - public TaskState GetTaskState() - { - return _currentStatus.State; - } - - /// <summary> - /// Called by heartbeat manager - /// </summary> - /// <returns> current TaskStatusProto </returns> - public TaskStatusProto GetStatusProto() - { - return _currentStatus.ToProto(); - } - - public bool HasEnded() - { - return _currentStatus.HasEnded(); - } - - /// <summary> - /// get ID of the task. - /// </summary> - /// <returns>ID of the task.</returns> - public string GetActicityId() - { - return _currentStatus.TaskId; - } - - public void Close(byte[] message) - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to close Task {0}", TaskId)); - if (_currentStatus.IsNotRunning()) - { - LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to close an task that is in {0} state. Ignored.", _currentStatus.State)); - } - else - { - try - { - OnNext(new CloseEventImpl(message)); - _currentStatus.SetCloseRequested(); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", LOGGER); - - _currentStatus.SetException( - new TaskClientCodeException(TaskId, ContextId, "Error during Close().", e)); - } - } - } - - public void Suspend(byte[] message) - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Trying to suspend Task {0}", TaskId)); - - if (_currentStatus.IsNotRunning()) - { - LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to supend an task that is in {0} state. Ignored.", _currentStatus.State)); - } - else - { - try - { - OnNext(new SuspendEventImpl(message)); - _currentStatus.SetSuspendRequested(); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Suspend.", LOGGER); - _currentStatus.SetException( - new TaskClientCodeException(TaskId, ContextId, "Error during Suspend().", e)); - } - } - } - - public void Deliver(byte[] message) - { - if (_currentStatus.IsNotRunning()) - { - LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to send a message to an task that is in {0} state. Ignored.", _currentStatus.State)); - } - else - { - try - { - OnNext(new DriverMessageImpl(message)); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during message delivery.", LOGGER); - _currentStatus.SetException( - new TaskClientCodeException(TaskId, ContextId, "Error during message delivery.", e)); - } - } - } - - public void OnNext(ICloseEvent value) - { - LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)"); - // TODO: send a heartbeat - } - - void IObserver<ICloseEvent>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<IDriverMessage>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<IDriverMessage>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<ISuspendEvent>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<ISuspendEvent>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<ICloseEvent>.OnCompleted() - { - throw new NotImplementedException(); - } - - public void OnNext(ISuspendEvent value) - { - LOGGER.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)"); - // TODO: send a heartbeat - } - - public void OnNext(IDriverMessage value) - { - IDriverMessageHandler messageHandler = null; - LOGGER.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage value)"); - try - { - messageHandler = _injector.GetInstance<IDriverMessageHandler>(); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, "Received Driver message, but unable to inject handler for driver message ", LOGGER); - } - if (messageHandler != null) - { - try - { - messageHandler.Handle(value); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Exception throw when handling driver message: " + e, LOGGER); - _currentStatus.RecordExecptionWithoutHeartbeat(e); - } - } - } - - private byte[] RunTask(byte[] memento) - { - return _task.Call(memento); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs deleted file mode 100644 index ad8002b..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStartImpl.cs +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tasks.Events; - -namespace Org.Apache.REEF.Common -{ - public class TaskStartImpl : ITaskStart - { - //INJECT - public TaskStartImpl(string id) - { - Id = id; - } - - public string Id { get; set; } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskState.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskState.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskState.cs deleted file mode 100644 index 258bc24..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskState.cs +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.REEF.Common -{ - public enum TaskState - { - Init = 0, - - Running = 1, - - CloseRequested = 2, - - SuspendRequested = 3, - - Suspended = 4, - - Failed = 5, - - Done = 6, - - Killed = 7 - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStatus.cs deleted file mode 100644 index ba00262..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStatus.cs +++ /dev/null @@ -1,330 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; -using Org.Apache.REEF.Tasks; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Logging; -using System; -using System.Collections.Generic; -using System.Globalization; - -namespace Org.Apache.REEF.Common -{ - public class TaskStatus - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskStatus)); - private readonly TaskLifeCycle _taskLifeCycle; - private readonly HeartBeatManager _heartBeatManager; - private readonly Optional<ISet<ITaskMessageSource>> _evaluatorMessageSources; - - private string _taskId; - private string _contextId; - private Optional<Exception> _lastException = Optional<Exception>.Empty(); - private Optional<byte[]> _result = Optional<byte[]>.Empty(); - private TaskState _state; - - public TaskStatus(HeartBeatManager heartBeatManager, string contextId, string taskId, Optional<ISet<ITaskMessageSource>> evaluatorMessageSources) - { - _contextId = contextId; - _taskId = taskId; - _heartBeatManager = heartBeatManager; - _taskLifeCycle = new TaskLifeCycle(); - _evaluatorMessageSources = evaluatorMessageSources; - State = TaskState.Init; - } - - public TaskState State - { - get - { - return _state; - } - - set - { - if (IsLegalStateTransition(_state, value)) - { - _state = value; - } - else - { - string message = string.Format(CultureInfo.InvariantCulture, "Illegal state transition from [{0}] to [{1}]", _state, value); - LOGGER.Log(Level.Error, message); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(message), LOGGER); - } - } - } - - public string TaskId - { - get { return _taskId; } - } - - public string ContextId - { - get { return _contextId; } - } - - public void SetException(Exception e) - { - RecordExecptionWithoutHeartbeat(e); - Heartbeat(); - _lastException = Optional<Exception>.Empty(); - } - - public void SetResult(byte[] result) - { - _result = Optional<byte[]>.OfNullable(result); - if (State == TaskState.Running) - { - State = TaskState.Done; - } - else if (State == TaskState.SuspendRequested) - { - State = TaskState.Suspended; - } - else if (State == TaskState.CloseRequested) - { - State = TaskState.Done; - } - _taskLifeCycle.Stop(); - Heartbeat(); - } - - public void SetRunning() - { - LOGGER.Log(Level.Verbose, "TaskStatus::SetRunning"); - if (_state == TaskState.Init) - { - try - { - _taskLifeCycle.Start(); - // Need to send an INIT heartbeat to the driver prompting it to create an RunningTask event. - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Sending task INIT heartbeat")); - Heartbeat(); - State = TaskState.Running; - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Cannot set task status to running.", LOGGER); - SetException(e); - } - } - } - - public void SetCloseRequested() - { - State = TaskState.CloseRequested; - } - - public void SetSuspendRequested() - { - State = TaskState.SuspendRequested; - } - - public void SetKilled() - { - State = TaskState.Killed; - Heartbeat(); - } - - public bool IsNotRunning() - { - return _state != TaskState.Running; - } - - public bool HasEnded() - { - switch (_state) - { - case TaskState.Done: - case TaskState.Suspended: - case TaskState.Failed: - case TaskState.Killed: - return true; - default: - return false; - } - } - - public TaskStatusProto ToProto() - { - Check(); - TaskStatusProto taskStatusProto = new TaskStatusProto() - { - context_id = _contextId, - task_id = _taskId, - state = GetProtoState(), - }; - if (_result.IsPresent()) - { - taskStatusProto.result = ByteUtilities.CopyBytesFrom(_result.Value); - } - else if (_lastException.IsPresent()) - { - //final Encoder<Throwable> codec = new ObjectSerializableCodec<>(); - //final byte[] error = codec.encode(_lastException.get()); - byte[] error = ByteUtilities.StringToByteArrays(_lastException.Value.ToString()); - taskStatusProto.result = ByteUtilities.CopyBytesFrom(error); - } - else if (_state == TaskState.Running) - { - foreach (TaskMessage message in GetMessages()) - { - TaskStatusProto.TaskMessageProto taskMessageProto = new TaskStatusProto.TaskMessageProto() - { - source_id = message.MessageSourceId, - message = ByteUtilities.CopyBytesFrom(message.Message), - }; - taskStatusProto.task_message.Add(taskMessageProto); - } - } - return taskStatusProto; - } - - internal void RecordExecptionWithoutHeartbeat(Exception e) - { - if (!_lastException.IsPresent()) - { - _lastException = Optional<Exception>.Of(e); - } - State = TaskState.Failed; - _taskLifeCycle.Stop(); - } - - private static bool IsLegalStateTransition(TaskState? from, TaskState to) - { - if (from == null) - { - return to == TaskState.Init; - } - switch (from) - { - case TaskState.Init: - switch (to) - { - case TaskState.Init: - case TaskState.Running: - case TaskState.Failed: - case TaskState.Killed: - case TaskState.Done: - return true; - default: - return false; - } - case TaskState.Running: - switch (to) - { - case TaskState.CloseRequested: - case TaskState.SuspendRequested: - case TaskState.Failed: - case TaskState.Killed: - case TaskState.Done: - return true; - default: - return false; - } - case TaskState.CloseRequested: - switch (to) - { - case TaskState.Failed: - case TaskState.Killed: - case TaskState.Done: - return true; - default: - return false; - } - case TaskState.SuspendRequested: - switch (to) - { - case TaskState.Failed: - case TaskState.Killed: - case TaskState.Suspended: - return true; - default: - return false; - } - - case TaskState.Failed: - case TaskState.Done: - case TaskState.Killed: - default: - return true; - } - } - - private void Check() - { - if (_result.IsPresent() && _lastException.IsPresent()) - { - LOGGER.Log(Level.Warning, "Both task result and exception are present, the expcetion will take over. Thrown away result:" + ByteUtilities.ByteArrarysToString(_result.Value)); - State = TaskState.Failed; - _result = Optional<byte[]>.Empty(); - } - } - - private void Heartbeat() - { - _heartBeatManager.OnNext(ToProto()); - } - - private State GetProtoState() - { - switch (_state) - { - case TaskState.Init: - return ProtoBuf.ReefServiceProto.State.INIT; - case TaskState.CloseRequested: - case TaskState.SuspendRequested: - case TaskState.Running: - return ProtoBuf.ReefServiceProto.State.RUNNING; - case TaskState.Done: - return ProtoBuf.ReefServiceProto.State.DONE; - case TaskState.Suspended: - return ProtoBuf.ReefServiceProto.State.SUSPEND; - case TaskState.Failed: - return ProtoBuf.ReefServiceProto.State.FAILED; - case TaskState.Killed: - return ProtoBuf.ReefServiceProto.State.KILLED; - default: - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Unknown state: " + _state), LOGGER); - break; - } - return ProtoBuf.ReefServiceProto.State.FAILED; //this line should not be reached as default case will throw exception - } - - private ICollection<TaskMessage> GetMessages() - { - List<TaskMessage> result = new List<TaskMessage>(); - if (_evaluatorMessageSources.IsPresent()) - { - foreach (ITaskMessageSource source in _evaluatorMessageSources.Value) - { - Optional<TaskMessage> taskMessageOptional = source.Message; - if (taskMessageOptional.IsPresent()) - { - result.Add(taskMessageOptional.Value); - } - } - } - return result; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStopImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStopImpl.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStopImpl.cs deleted file mode 100644 index 397411b..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/task/TaskStopImpl.cs +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Task; -using Org.Apache.REEF.Tasks.Events; - -namespace Org.Apache.REEF.Common -{ - public class TaskStopImpl : ITaskStop - { - //INJECT - public TaskStopImpl(string id) - { - Id = id; - } - - public string Id { get; set; } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/EvaluatorConfigurations.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/EvaluatorConfigurations.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/EvaluatorConfigurations.cs deleted file mode 100644 index 0125128..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/EvaluatorConfigurations.cs +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Formats; -using System; -using System.IO; -using System.Linq; - -namespace Org.Apache.REEF.Common -{ - public class EvaluatorConfigurations - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorConfigurations)); - - private AvroConfiguration _avroConfiguration; - - private string _configFile; - - private string _applicationId; - - private string _evaluatorId; - - private string _taskConfiguration; - - private string _rootContextConfiguration; - - private string _rootServiceConfiguration; - - public EvaluatorConfigurations(string configFile) - { - using (LOGGER.LogFunction("EvaluatorConfigurations::EvaluatorConfigurations")) - { - if (string.IsNullOrWhiteSpace(configFile)) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("configFile"), LOGGER); - } - if (!File.Exists(configFile)) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new FileNotFoundException("cannot find file " + configFile), LOGGER); - } - _configFile = configFile; - AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); - _avroConfiguration = serializer.AvroDeseriaizeFromFile(_configFile); - } - } - - public string TaskConfiguration - { - get - { - _taskConfiguration = _taskConfiguration ?? GetSettingValue(REEF.Evaluator.Constants.TaskConfiguration); - return _taskConfiguration; - } - } - - public string EvaluatorId - { - get - { - _evaluatorId = _evaluatorId ?? GetSettingValue(REEF.Evaluator.Constants.EvaluatorIdentifier); - return _evaluatorId; - } - } - - public string ApplicationId - { - get - { - _applicationId = _applicationId ?? GetSettingValue(REEF.Evaluator.Constants.ApplicationIdentifier); - return _applicationId; - } - } - - public string RootContextConfiguration - { - get - { - _rootContextConfiguration = _rootContextConfiguration ?? GetSettingValue(REEF.Evaluator.Constants.RootContextConfiguration); - return _rootContextConfiguration; - } - } - - public string RootServiceConfiguration - { - get - { - _rootServiceConfiguration = _rootServiceConfiguration ?? GetSettingValue(REEF.Evaluator.Constants.RootServiceConfiguration); - return _rootServiceConfiguration; - } - } - - private string GetSettingValue(string settingKey) - { - ConfigurationEntry configurationEntry = - _avroConfiguration.Bindings.SingleOrDefault(b => b.key.EndsWith(settingKey, StringComparison.OrdinalIgnoreCase)); - if (configurationEntry == null) - { - return string.Empty; - } - - return configurationEntry.value; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/RemoteManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/RemoteManager.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/RemoteManager.cs deleted file mode 100644 index c9bcb56..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/utils/RemoteManager.cs +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.REEF.Common -{ - public class RemoteManager - { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/services/IService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/services/IService.cs b/lang/cs/Org.Apache.REEF.Common/services/IService.cs deleted file mode 100644 index 016291c..0000000 --- a/lang/cs/Org.Apache.REEF.Common/services/IService.cs +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.REEF.Services -{ - public interface IService - { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/services/ServiceConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/services/ServiceConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/services/ServiceConfiguration.cs deleted file mode 100644 index 5331709..0000000 --- a/lang/cs/Org.Apache.REEF.Common/services/ServiceConfiguration.cs +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Util; -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; - -[module: SuppressMessage("StyleCop.CSharp.MaintainabilityRules", "SA1401:FieldsMustBePrivate", Justification = "static field, typical usage in configurations")] - -namespace Org.Apache.REEF.Services -{ - /// <summary> - /// Configuration module for services. The configuration created here can be passed alongside a ContextConfiguration - /// to form a context. Different from bindings made in the ContextConfiguration, those made here will be passed along - /// to child context. - /// </summary> - public class ServiceConfiguration : ConfigurationModuleBuilder - { - /// <summary> - /// A set of services to instantiate. All classes given here will be instantiated in the context, and their references - /// will be made available to child context and tasks. - /// </summary> - [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] - public static readonly OptionalParameter<IService> Services = new OptionalParameter<IService>(); - - public ServiceConfiguration() - : base() - { - } - - public ServiceConfiguration(string config) - { - TangConfig = new AvroConfigurationSerializer().FromString(config); - } - - public static ConfigurationModule ConfigurationModule - { - get - { - return new ServiceConfiguration() - .BindSetEntry(GenericType<ServicesSet>.Class, Services) - .Build(); - } - } - - public IConfiguration TangConfig { get; private set; } - } - - public class InjectedServices - { - [Inject] - public InjectedServices([Parameter(typeof(ServicesSet))] ISet<IService> services) - { - Services = services; - } - - public ISet<IService> Services { get; set; } - } - - [NamedParameter("Set of services", "servicesSet", "")] - class ServicesSet : Name<ISet<IService>> - { - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/services/ServicesConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/services/ServicesConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Common/services/ServicesConfigurationOptions.cs deleted file mode 100644 index 31a206a..0000000 --- a/lang/cs/Org.Apache.REEF.Common/services/ServicesConfigurationOptions.cs +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Services -{ - public class ServicesConfigurationOptions - { - [NamedParameter("Services", "services", "services")] - public class Services : Name<string> - { - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/tasks/IDriverMessageHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/tasks/IDriverMessageHandler.cs b/lang/cs/Org.Apache.REEF.Common/tasks/IDriverMessageHandler.cs deleted file mode 100644 index 4c5d42f..0000000 --- a/lang/cs/Org.Apache.REEF.Common/tasks/IDriverMessageHandler.cs +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Tasks.Defaults; -using Org.Apache.REEF.Tasks.Events; -using Org.Apache.REEF.Tang.Annotations; - -namespace Org.Apache.REEF.Tasks -{ - //[DefaultImplementation(typeof(DefaultTaskMessageSource))] - public interface IDriverMessageHandler - { - void Handle(IDriverMessage message); - } -}
