http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs new file mode 100644 index 0000000..e495fda --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs @@ -0,0 +1,390 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.Context; +using Org.Apache.Reef.Common.Evaluator; +using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto; +using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol; +using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.Reef.Common.Runtime; +using Org.Apache.Reef.Evaluator; +using Org.Apache.Reef.Utilities; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Wake.Remote; +using Org.Apache.Reef.Wake.Remote.Impl; +using Org.Apache.Reef.Wake.Time; +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Globalization; +using System.Linq; +using System.Net; +using System.Threading; + +namespace Org.Apache.Reef.Common +{ + public class HeartBeatManager : IObserver<Alarm> + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(HeartBeatManager)); + + private static readonly MachineStatus MachineStatus = new MachineStatus(); + + private readonly IRemoteManager<REEFMessage> _remoteManager; + + private readonly IClock _clock; + + private readonly int _heartBeatPeriodInMillSeconds; + + private readonly int _maxHeartbeatRetries = 0; + + private readonly string _evaluatorId; + + private IRemoteIdentifier _remoteId; + + private IObserver<REEFMessage> _observer; + + private int _heartbeatFailures = 0; + + private IDriverConnection _driverConnection; + + private EvaluatorSettings _evaluatorSettings; + + // the queue can only contains the following: + // 1. all failed heartbeats (regular and event-based) before entering RECOVERY state + // 2. event-based heartbeats generated in RECOVERY state (since there will be no attempt to send regular heartbeat) + private Queue<EvaluatorHeartbeatProto> _queuedHeartbeats = new Queue<EvaluatorHeartbeatProto>(); + + public HeartBeatManager(EvaluatorSettings settings, IRemoteIdentifier remoteId) + { + using (LOGGER.LogFunction("HeartBeatManager::HeartBeatManager")) + { + _remoteManager = settings.RemoteManager; + _remoteId = remoteId; + _evaluatorId = settings.EvalutorId; + _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId)); + _clock = settings.RuntimeClock; + _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs; + _maxHeartbeatRetries = settings.MaxHeartbeatFailures; + EvaluatorSettings = settings; + MachineStatus.ToString(); // kick start the CPU perf counter + } + } + + [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")] + public EvaluatorRuntime _evaluatorRuntime { get; set; } + + [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")] + public ContextManager _contextManager { get; set; } + + public EvaluatorSettings EvaluatorSettings + { + get + { + return _evaluatorSettings; + } + + private set + { + _evaluatorSettings = value; + } + } + + public void Send(EvaluatorHeartbeatProto evaluatorHeartbeatProto) + { + lock (_queuedHeartbeats) + { + if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "In RECOVERY mode, heartbeat queued as [{0}]. ", evaluatorHeartbeatProto)); + _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto); + return; + } + + // NOT during recovery, try to send + REEFMessage payload = new REEFMessage(evaluatorHeartbeatProto); + try + { + _observer.OnNext(payload); + _heartbeatFailures = 0; // reset failure counts if we are having intermidtten (not continuous) failures + } + catch (Exception e) + { + if (evaluatorHeartbeatProto.task_status == null || evaluatorHeartbeatProto.task_status.state != State.RUNNING) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications to driver when no task is running, recovery NOT supported for such scenario", LOGGER); + } + + _heartbeatFailures++; + + _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto); + LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e); + + if (_heartbeatFailures >= _maxHeartbeatRetries) + { + LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Heartbeat communications to driver reached max of {0} failures. \n==== Driver is considered dead/unreachable. === \n=========== Entering RECOVERY mode. ===========", _heartbeatFailures)); + try + { + _driverConnection = _evaluatorSettings.Injector.GetInstance<IDriverConnection>(); + } + catch (Exception ex) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Failed to inject the driver reconnect implementation", LOGGER); + } + LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection); + _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY; + + // clean heartbeat failure + _heartbeatFailures = 0; + } + } + } + } + + /// <summary> + /// Assemble a complete new heartbeat and send it out. + /// </summary> + public void OnNext() + { + LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext()"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext()"); + EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(); + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); + Send(heartbeatProto); + } + } + + /// <summary> + /// Called with a specific TaskStatus that must be delivered to the driver + /// </summary> + /// <param name="taskStatusProto"></param> + public void OnNext(TaskStatusProto taskStatusProto) + { + LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(TaskStatusProto)"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(TaskStatusProto)"); + EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto( + _evaluatorRuntime.GetEvaluatorStatus(), + _contextManager.GetContextStatusCollection(), + Optional<TaskStatusProto>.Of(taskStatusProto)); + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); + Send(heartbeatProto); + } + } + + /// <summary> + /// Called with a specific ContextStatusProto that must be delivered to the driver + /// </summary> + /// <param name="contextStatusProto"></param> + public void OnNext(ContextStatusProto contextStatusProto) + { + LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(ContextStatusProto)"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(ContextStatusProto)"); + List<ContextStatusProto> contextStatusProtos = new List<ContextStatusProto>(); + contextStatusProtos.Add(contextStatusProto); + contextStatusProtos.AddRange(_contextManager.GetContextStatusCollection()); + EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto( + _evaluatorRuntime.GetEvaluatorStatus(), + contextStatusProtos, + Optional<TaskStatusProto>.Empty()); + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); + Send(heartbeatProto); + } + } + + /// <summary> + /// Called with a specific EvaluatorStatus that must be delivered to the driver + /// </summary> + /// <param name="evaluatorStatusProto"></param> + public void OnNext(EvaluatorStatusProto evaluatorStatusProto) + { + LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(EvaluatorStatusProto)"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(EvaluatorStatusProto)"); + EvaluatorHeartbeatProto heartbeatProto = new EvaluatorHeartbeatProto() + { + timestamp = CurrentTimeMilliSeconds(), + evaluator_status = evaluatorStatusProto + }; + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); + Send(heartbeatProto); + } + } + + public void OnNext(Alarm value) + { + LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(Alarm)"); + lock (this) + { + LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(Alarm)"); + if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && _evaluatorRuntime.State == State.RUNNING) + { + EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto(); + LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString())); + Send(evaluatorHeartbeatProto); + _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this); + } + else + { + LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", _evaluatorSettings.OperationState, _evaluatorRuntime.State)); + try + { + DriverInformation driverInformation = _driverConnection.GetDriverInformation(_evaluatorSettings.ApplicationId); + if (driverInformation == null) + { + LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later."); + } + else + { + LOGGER.Log( + Level.Info, + string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId)); + Recover(driverInformation); + } + } + catch (Exception e) + { + // we do not want any exception to stop the query for driver status + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, LOGGER); + } + _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this); + } + } + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + private static long CurrentTimeMilliSeconds() + { + // this is an implmenation to get current time milli second counted from Jan 1st, 1970 + // it is chose as such to be compatible with java implmentation + DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); + return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds; + } + + private void Recover(DriverInformation driverInformation) + { + IPEndPoint driverEndpoint = NetUtilities.ParseIpEndpoint(driverInformation.DriverRemoteIdentifier); + _remoteId = new SocketRemoteIdentifier(driverEndpoint); + _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId)); + lock (_evaluatorSettings) + { + if (_evaluatorSettings.NameClient != null) + { + try + { + LOGGER.Log(Level.Verbose, "Trying to reset and reconnect to name server" + driverInformation.NameServerId); + _evaluatorSettings.NameClient.Restart(NetUtilities.ParseIpEndpoint(driverInformation.NameServerId)); + LOGGER.Log(Level.Info, "Reconnected to name server: " + driverInformation.NameServerId); + } + catch (Exception e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + } + } + } + + lock (_queuedHeartbeats) + { + bool firstHeartbeatInQueue = true; + while (_queuedHeartbeats.Any()) + { + LOGGER.Log(Level.Info, "Sending cached recovery heartbeats to " + _remoteId); + try + { + if (firstHeartbeatInQueue) + { + // first heartbeat is specially construted to include the recovery flag + EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue()); + LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat); + _observer.OnNext(new REEFMessage(recoveryHeartbeat)); + firstHeartbeatInQueue = false; + } + else + { + _observer.OnNext(new REEFMessage(_queuedHeartbeats.Dequeue())); + } + } + catch (Exception e) + { + // we do not handle failures during RECOVERY + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow( + e, + Level.Error, + string.Format(CultureInfo.InvariantCulture, "Hearbeat attempt failed in RECOVERY mode to Driver {0} , giving up...", _remoteId), + LOGGER); + } + Thread.Sleep(500); + } + } + _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL; + LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ==========="); + } + + private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat) + { + heartbeat.recovery = true; + heartbeat.context_status.ForEach(c => c.recovery = true); + heartbeat.task_status.recovery = true; + return heartbeat; + } + + private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto() + { + return GetEvaluatorHeartbeatProto( + _evaluatorRuntime.GetEvaluatorStatus(), + _contextManager.GetContextStatusCollection(), + _contextManager.GetTaskStatus()); + } + + private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto( + EvaluatorStatusProto evaluatorStatusProto, + ICollection<ContextStatusProto> contextStatusProtos, + Optional<TaskStatusProto> taskStatusProto) + { + EvaluatorHeartbeatProto evaluatorHeartbeatProto = new EvaluatorHeartbeatProto() + { + timestamp = CurrentTimeMilliSeconds(), + evaluator_status = evaluatorStatusProto + }; + foreach (ContextStatusProto contextStatusProto in contextStatusProtos) + { + evaluatorHeartbeatProto.context_status.Add(contextStatusProto); + } + if (taskStatusProto.IsPresent()) + { + evaluatorHeartbeatProto.task_status = taskStatusProto.Value; + } + return evaluatorHeartbeatProto; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs new file mode 100644 index 0000000..8a7aa94 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol; +using Org.Apache.Reef.Utilities; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Wake.Remote; +using System; +using System.Globalization; +using System.Threading; + +namespace Org.Apache.Reef.Common +{ + public class ReefMessageProtoObserver : + IObserver<IRemoteMessage<REEFMessage>>, + IObservable<IRemoteMessage<REEFMessage>>, + IDisposable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ReefMessageProtoObserver)); + private volatile IObserver<IRemoteMessage<REEFMessage>> _observer = null; + private long _count = 0; + private DateTime _begin; + private DateTime _origBegin; + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(IRemoteMessage<REEFMessage> value) + { + REEFMessage remoteEvent = value.Message; + IRemoteIdentifier id = value.Identifier; + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "receive a ReefMessage from {0} Driver at {1}.", remoteEvent, id)); + + if (remoteEvent.evaluatorControl != null) + { + if (remoteEvent.evaluatorControl.context_control != null) + { + string context_message = null; + string task_message = null; + + if (remoteEvent.evaluatorControl.context_control.context_message != null) + { + context_message = remoteEvent.evaluatorControl.context_control.context_message.ToString(); + } + if (remoteEvent.evaluatorControl.context_control.task_message != null) + { + task_message = ByteUtilities.ByteArrarysToString(remoteEvent.evaluatorControl.context_control.task_message); + } + + if (!(string.IsNullOrEmpty(context_message) && string.IsNullOrEmpty(task_message))) + { + LOGGER.Log(Level.Info, + string.Format(CultureInfo.InvariantCulture, "Control protobuf with context message [{0}] and task message [{1}]", context_message, task_message)); + } + else if (remoteEvent.evaluatorControl.context_control.remove_context != null) + { + LOGGER.Log(Level.Info, + string.Format(CultureInfo.InvariantCulture, "Control protobuf to remove context {0}", remoteEvent.evaluatorControl.context_control.remove_context.context_id)); + } + else if (remoteEvent.evaluatorControl.context_control.add_context != null) + { + LOGGER.Log(Level.Info, + string.Format(CultureInfo.InvariantCulture, "Control protobuf to add a context on top of {0}", remoteEvent.evaluatorControl.context_control.add_context.parent_context_id)); + } + else if (remoteEvent.evaluatorControl.context_control.start_task != null) + { + LOGGER.Log(Level.Info, + string.Format(CultureInfo.InvariantCulture, "Control protobuf to start an task in {0}", remoteEvent.evaluatorControl.context_control.start_task.context_id)); + } + else if (remoteEvent.evaluatorControl.context_control.stop_task != null) + { + LOGGER.Log(Level.Info, "Control protobuf to stop task"); + } + else if (remoteEvent.evaluatorControl.context_control.suspend_task != null) + { + LOGGER.Log(Level.Info, "Control protobuf to suspend task"); + } + } + } + if (_count == 0) + { + _begin = DateTime.Now; + _origBegin = _begin; + } + var count = Interlocked.Increment(ref _count); + + int printBatchSize = 100000; + if (count % printBatchSize == 0) + { + DateTime end = DateTime.Now; + var diff = (end - _begin).TotalMilliseconds; + double seconds = diff / 1000.0; + long eventsPerSecond = (long)(printBatchSize / seconds); + _begin = DateTime.Now; + } + + var observer = _observer; + if (observer != null) + { + observer.OnNext(value); + } + } + + public IDisposable Subscribe(IObserver<IRemoteMessage<REEFMessage>> observer) + { + if (_observer != null) + { + return null; + } + _observer = observer; + return this; + } + + public void Dispose() + { + _observer = null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs new file mode 100644 index 0000000..31194a7 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities; +using Org.Apache.Reef.Tang.Interface; +using System; + +namespace Org.Apache.Reef.Common.Context +{ + public class ContextClientCodeException : Exception + { + private readonly string _contextId; + private readonly Optional<string> _parentId; + + /// <summary> + /// construt the exception that caused the error + /// </summary> + /// <param name="contextId"> the id of the failed context.</param> + /// <param name="parentId"> the id of the failed context's parent, if any.</param> + /// <param name="message"> the error message </param> + /// <param name="cause"> the exception that caused the error</param> + public ContextClientCodeException( + string contextId, + Optional<string> parentId, + string message, + Exception cause) + : base("Failure in context '" + contextId + "': " + message, cause) + { + _contextId = contextId; + _parentId = parentId; + } + + public string ContextId + { + get { return _contextId; } + } + + public Optional<string> ParentId + { + get { return _parentId; } + } + + /// <summary> + /// Extracts a context id from the given configuration. + /// </summary> + /// <param name="c"></param> + /// <returns>the context id in the given configuration.</returns> + public static string GetId(IConfiguration c) + { + // TODO: update after TANG is available + return string.Empty; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs new file mode 100644 index 0000000..ca6b949 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Formats; +using Org.Apache.Reef.Tang.Interface; +using System; +using System.Collections.Generic; +using System.Globalization; +using System.IO; +using Org.Apache.Reef.Tang.Types; + +namespace Org.Apache.Reef.Common.Evaluator.Context +{ + public class ContextConfiguration : IConfiguration + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextConfiguration)); + + private Dictionary<string, string> _settings; + + public ContextConfiguration(string configString) + { + using (LOGGER.LogFunction("ContextConfiguration::ContextConfigurationn")) + { + ContainerDirectory = Directory.GetCurrentDirectory(); + + _settings = new Dictionary<string, string>(); + AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString); + foreach (ConfigurationEntry config in avroConfiguration.Bindings) + { + if (config.key.Contains(Reef.Evaluator.Constants.ContextIdentifier)) + { + config.key = Reef.Evaluator.Constants.ContextIdentifier; + LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "{0} detected for context id with value {1}", config.key, config.value)); + } + _settings.Add(config.key, config.value); + } + if (!_settings.ContainsKey(Reef.Evaluator.Constants.ContextIdentifier)) + { + string msg = "Required parameter ContextIdentifier not provided."; + LOGGER.Log(Level.Error, msg); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER); + } + } + } + + public string Id + { + get { return _settings[Reef.Evaluator.Constants.ContextIdentifier]; } + } + + public string ContainerDirectory { get; set; } + + public IConfigurationBuilder newBuilder() + { + throw new NotImplementedException(); + } + + public string GetNamedParameter(INamedParameterNode np) + { + throw new NotImplementedException(); + } + + public IClassHierarchy GetClassHierarchy() + { + throw new NotImplementedException(); + } + + public ISet<object> GetBoundSet(INamedParameterNode np) + { + throw new NotImplementedException(); + } + + public IClassNode GetBoundConstructor(IClassNode cn) + { + throw new NotImplementedException(); + } + + public IClassNode GetBoundImplementation(IClassNode cn) + { + throw new NotImplementedException(); + } + + public IConstructorDef GetLegacyConstructor(IClassNode cn) + { + throw new NotImplementedException(); + } + + public ICollection<IClassNode> GetBoundImplementations() + { + throw new NotImplementedException(); + } + + public ICollection<IClassNode> GetBoundConstructors() + { + throw new NotImplementedException(); + } + + public ICollection<INamedParameterNode> GetNamedParameters() + { + throw new NotImplementedException(); + } + + public ICollection<IClassNode> GetLegacyConstructors() + { + throw new NotImplementedException(); + } + + public IList<object> GetBoundList(INamedParameterNode np) + { + throw new NotImplementedException(); + } + + public IEnumerator<KeyValuePair<INamedParameterNode, object>> GetBoundSets() + { + throw new NotImplementedException(); + } + + public IDictionary<INamedParameterNode, IList<object>> GetBoundList() + { + throw new NotImplementedException(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs new file mode 100644 index 0000000..9967258 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.Events; +using System; +using System.Collections.Generic; + +namespace Org.Apache.Reef.Common.Context +{ + /// <summary> + /// This class is used to trigger all the context life-cycle dependent events. + /// </summary> + class ContextLifeCycle + { + private HashSet<IObserver<IContextStart>> _contextStartHandlers; + + private HashSet<IObserver<IContextStop>> _contextStopHandlers; + + private HashSet<IContextMessageSource> _contextMessageSources; + + // @Inject + public ContextLifeCycle( + string id, + HashSet<IObserver<IContextStart>> contextStartHandlers, + HashSet<IObserver<IContextStop>> contextStopHandlers, + HashSet<IContextMessageSource> contextMessageSources) + { + Id = id; + _contextStartHandlers = contextStartHandlers; + _contextStopHandlers = contextStopHandlers; + _contextMessageSources = contextMessageSources; + } + + public ContextLifeCycle(string contextId) + { + Id = contextId; + _contextStartHandlers = new HashSet<IObserver<IContextStart>>(); + _contextStopHandlers = new HashSet<IObserver<IContextStop>>(); + _contextMessageSources = new HashSet<IContextMessageSource>(); + } + + public string Id { get; private set; } + + public HashSet<IContextMessageSource> ContextMessageSources + { + get { return _contextMessageSources; } + } + + /// <summary> + /// Fires ContextStart to all registered event handlers. + /// </summary> + public void Start() + { + IContextStart contextStart = new ContextStartImpl(Id); + + // TODO: enable + //foreach (IObserver<IContextStart> startHandler in _contextStartHandlers) + //{ + // startHandler.OnNext(contextStart); + //} + } + + /// <summary> + /// Fires ContextStop to all registered event handlers. + /// </summary> + public void Close() + { + //IContextStop contextStop = new ContextStopImpl(Id); + //foreach (IObserver<IContextStop> startHandler in _contextStopHandlers) + //{ + // startHandler.OnNext(contextStop); + //} + } + + public void HandleContextMessage(byte[] message) + { + //contextMessageHandler.onNext(message); + } + + /// <summary> + /// get the set of ContextMessageSources configured + /// </summary> + /// <returns>(a shallow copy of) the set of ContextMessageSources configured.</returns> + public HashSet<IContextMessageSource> GetContextMessageSources() + { + return new HashSet<IContextMessageSource>(_contextMessageSources); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs new file mode 100644 index 0000000..15b09b9 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs @@ -0,0 +1,362 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.Evaluator.Context; +using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto; +using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.Reef.Common.Task; +using Org.Apache.Reef.Evaluator; +using Org.Apache.Reef.Services; +using Org.Apache.Reef.Tasks; +using Org.Apache.Reef.Utilities; +using Org.Apache.Reef.Utilities.Logging; +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Globalization; +using System.Linq; + +namespace Org.Apache.Reef.Common.Context +{ + public class ContextManager : IDisposable + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextManager)); + + private readonly Stack<ContextRuntime> _contextStack = new Stack<ContextRuntime>(); + + private readonly HeartBeatManager _heartBeatManager; + + private RootContextLauncher _rootContextLauncher; + + public ContextManager(HeartBeatManager heartBeatManager, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig) + { + using (LOGGER.LogFunction("ContextManager::ContextManager")) + { + _heartBeatManager = heartBeatManager; + _rootContextLauncher = new RootContextLauncher(_heartBeatManager.EvaluatorSettings.RootContextConfig, rootServiceConfig, rootTaskConfig); + } + } + + /// <summary> + /// Start the context manager. This initiates the root context. + /// </summary> + public void Start() + { + lock (_contextStack) + { + ContextRuntime rootContext = _rootContextLauncher.GetRootContext(); + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id {0}", rootContext.Id)); + _contextStack.Push(rootContext); + + if (_rootContextLauncher.RootTaskConfig.IsPresent()) + { + LOGGER.Log(Level.Info, "Launching the initial Task"); + try + { + _contextStack.Peek().StartTask(_rootContextLauncher.RootTaskConfig.Value, _rootContextLauncher.RootContextConfig.Id, _heartBeatManager); + } + catch (TaskClientCodeException e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Exception when trying to start a task.", LOGGER); + HandleTaskException(e); + } + } + } + } + + public bool ContextStackIsEmpty() + { + lock (_contextStack) + { + return (_contextStack.Count == 0); + } + } + + // TODO: codes here are slightly different from java since the protobuf.net does not generate the HasXXX method, may want to switch to proto-port later + + /// <summary> + /// Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts. + /// This also triggers the HeartBeatManager to send a heartbeat with the result of this operation. + /// </summary> + /// <param name="controlMessage"></param> + public void HandleTaskControl(ContextControlProto controlMessage) + { + try + { + byte[] message = controlMessage.task_message; + if (controlMessage.add_context != null && controlMessage.remove_context != null) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Received a message with both add and remove context. This is unsupported."), LOGGER); + } + if (controlMessage.add_context != null) + { + LOGGER.Log(Level.Info, "AddContext"); + AddContext(controlMessage.add_context); + // support submitContextAndTask() + if (controlMessage.start_task != null) + { + LOGGER.Log(Level.Info, "StartTask"); + StartTask(controlMessage.start_task); + } + else + { + // We need to trigger a heartbeat here. In other cases, the heartbeat will be triggered by the TaskRuntime + // Therefore this call can not go into addContext + LOGGER.Log(Level.Info, "Trigger Heartbeat"); + _heartBeatManager.OnNext(); + } + } + else if (controlMessage.remove_context != null) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "RemoveContext with id {0}", controlMessage.remove_context.context_id)); + RemoveContext(controlMessage.remove_context.context_id); + } + else if (controlMessage.start_task != null) + { + LOGGER.Log(Level.Info, "StartTask only"); + StartTask(controlMessage.start_task); + } + else if (controlMessage.stop_task != null) + { + LOGGER.Log(Level.Info, "CloseTask"); + _contextStack.Peek().CloseTask(message); + } + else if (controlMessage.suspend_task != null) + { + LOGGER.Log(Level.Info, "SuspendTask"); + _contextStack.Peek().SuspendTask(message); + } + else if (controlMessage.task_message != null) + { + LOGGER.Log(Level.Info, "DeliverTaskMessage"); + _contextStack.Peek().DeliverTaskMessage(message); + } + else if (controlMessage.context_message != null) + { + LOGGER.Log(Level.Info, "Handle context contol message"); + ContextMessageProto contextMessageProto = controlMessage.context_message; + bool deliveredMessage = false; + foreach (ContextRuntime context in _contextStack) + { + if (context.Id.Equals(contextMessageProto.context_id)) + { + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Handle context message {0}", controlMessage.context_message.message)); + context.HandleContextMessaage(controlMessage.context_message.message); + deliveredMessage = true; + break; + } + } + if (!deliveredMessage) + { + InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent message to unknown context {0}", contextMessageProto.context_id)); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + } + else + { + InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown task control message: {0}", controlMessage.ToString())); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + } + catch (Exception e) + { + if (e is TaskClientCodeException) + { + HandleTaskException((TaskClientCodeException)e); + } + else if (e is ContextClientCodeException) + { + HandlContextException((ContextClientCodeException)e); + } + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, LOGGER); + } + } + + /// <summary> + /// Get TaskStatusProto of the currently running task, if there is any + /// </summary> + /// <returns>the TaskStatusProto of the currently running task, if there is any</returns> + public Optional<TaskStatusProto> GetTaskStatus() + { + if (_contextStack.Count == 0) + { + return Optional<TaskStatusProto>.Empty(); + + //throw new InvalidOperationException("Asked for an Task status while there isn't even a context running."); + } + return _contextStack.Peek().GetTaskStatus(); + } + + /// <summary> + /// get status of all contexts in the stack. + /// </summary> + /// <returns>the status of all contexts in the stack.</returns> + public ICollection<ContextStatusProto> GetContextStatusCollection() + { + ICollection<ContextStatusProto> result = new Collection<ContextStatusProto>(); + foreach (ContextRuntime runtime in _contextStack) + { + ContextStatusProto contextStatusProto = runtime.GetContextStatus(); + LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", contextStatusProto)); + result.Add(contextStatusProto); + } + return result; + } + + /// <summary> + /// Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack, + /// starting at the top. + /// </summary> + public void Dispose() + { + lock (_contextStack) + { + if (_contextStack != null && _contextStack.Any()) + { + LOGGER.Log(Level.Info, "context stack not empty, forcefully closing context runtime."); + ContextRuntime runtime = _contextStack.Last(); + if (runtime != null) + { + runtime.Dispose(); + } + } + } + } + + /// <summary> + /// Add a context to the stack. + /// </summary> + /// <param name="addContextProto"></param> + private void AddContext(AddContextProto addContextProto) + { + lock (_contextStack) + { + ContextRuntime currentTopContext = _contextStack.Peek(); + if (!currentTopContext.Id.Equals(addContextProto.parent_context_id, StringComparison.OrdinalIgnoreCase)) + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to instantiate a child context on context with id '{0}' while the current top context id is {1}", + addContextProto.parent_context_id, + currentTopContext.Id)); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + string contextConfigString = addContextProto.context_configuration; + ContextConfiguration contextConfiguration = new ContextConfiguration(contextConfigString); + ContextRuntime newTopContext; + if (addContextProto.service_configuration != null) + { + ServiceConfiguration serviceConfiguration = new ServiceConfiguration(addContextProto.service_configuration); + newTopContext = currentTopContext.SpawnChildContext(contextConfiguration, serviceConfiguration.TangConfig); + } + else + { + newTopContext = currentTopContext.SpawnChildContext(contextConfiguration); + } + _contextStack.Push(newTopContext); + } + } + + /// <summary> + /// Remove the context with the given ID from the stack. + /// </summary> + /// <param name="contextId"> context id</param> + private void RemoveContext(string contextId) + { + lock (_contextStack) + { + string currentTopContextId = _contextStack.Peek().Id; + if (!contextId.Equals(_contextStack.Peek().Id, StringComparison.OrdinalIgnoreCase)) + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to close context with id '{0}' while the top context id is {1}", contextId, currentTopContextId)); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + _contextStack.Peek().Dispose(); + if (_contextStack.Count > 1) + { + // We did not close the root context. Therefore, we need to inform the + // driver explicitly that this context is closed. The root context notification + // is implicit in the Evaluator close/done notification. + _heartBeatManager.OnNext(); // Ensure Driver gets notified of context DONE state + } + _contextStack.Pop(); + } + // System.gc(); // TODO: garbage collect? + } + + /// <summary> + /// Launch an Task. + /// </summary> + /// <param name="startTaskProto"></param> + private void StartTask(StartTaskProto startTaskProto) + { + lock (_contextStack) + { + ContextRuntime currentActiveContext = _contextStack.Peek(); + string expectedContextId = startTaskProto.context_id; + if (!expectedContextId.Equals(currentActiveContext.Id, StringComparison.OrdinalIgnoreCase)) + { + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task expected context '{0}' but the active context has Id '{1}'", expectedContextId, currentActiveContext.Id)); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + TaskConfiguration taskConfiguration = new TaskConfiguration(startTaskProto.configuration); + currentActiveContext.StartTask(taskConfiguration, expectedContextId, _heartBeatManager); + } + } + + /// <summary> + /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager + /// </summary> + /// <param name="e"></param> + private void HandleTaskException(TaskClientCodeException e) + { + LOGGER.Log(Level.Error, "TaskClientCodeException", e); + byte[] exception = ByteUtilities.StringToByteArrays(e.ToString()); + TaskStatusProto taskStatus = new TaskStatusProto() + { + context_id = e.ContextId, + task_id = e.TaskId, + result = exception, + state = State.FAILED + }; + LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeatb for a failed task: {0}", taskStatus.ToString())); + _heartBeatManager.OnNext(taskStatus); + } + + /// <summary> + /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager + /// </summary> + /// <param name="e"></param> + private void HandlContextException(ContextClientCodeException e) + { + LOGGER.Log(Level.Error, "ContextClientCodeException", e); + byte[] exception = ByteUtilities.StringToByteArrays(e.ToString()); + ContextStatusProto contextStatusProto = new ContextStatusProto() + { + context_id = e.ContextId, + context_state = ContextStatusProto.State.FAIL, + error = exception + }; + if (e.ParentId.IsPresent()) + { + contextStatusProto.parent_id = e.ParentId.Value; + } + LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeat for a failed context: {0}", contextStatusProto.ToString())); + _heartBeatManager.OnNext(contextStatusProto); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextRuntime.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextRuntime.cs new file mode 100644 index 0000000..012e436 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextRuntime.cs @@ -0,0 +1,478 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.Evaluator.Context; +using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto; +using Org.Apache.Reef.Common.Task; +using Org.Apache.Reef.Tasks; +using Org.Apache.Reef.Utilities; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Interface; +using System; +using System.Globalization; + +namespace Org.Apache.Reef.Common.Context +{ + public class ContextRuntime + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextRuntime)); + // Context-local injector. This contains information that will not be available in child injectors. + private readonly IInjector _contextInjector; + //// Service injector. State in this injector moves to child injectors. + private readonly IInjector _serviceInjector; + + // Convenience class to hold all the event handlers for the context as well as the service instances. + private readonly ContextLifeCycle _contextLifeCycle; + + // The child context, if any. + private Optional<ContextRuntime> _childContext = Optional<ContextRuntime>.Empty(); + + // The parent context, if any. + private Optional<ContextRuntime> _parentContext = Optional<ContextRuntime>.Empty(); + + // The currently running task, if any. + private Optional<TaskRuntime> _task = Optional<TaskRuntime>.Empty(); + + private ContextStatusProto.State _contextState = ContextStatusProto.State.READY; + + /// <summary> + /// Create a new ContextRuntime. + /// </summary> + /// <param name="serviceInjector"></param> + /// <param name="contextConfiguration">the Configuration for this context.</param> + /// <param name="parentContext"></param> + public ContextRuntime( + IInjector serviceInjector, + IConfiguration contextConfiguration, + Optional<ContextRuntime> parentContext) + { + ContextConfiguration config = contextConfiguration as ContextConfiguration; + if (config == null) + { + var e = new ArgumentException("contextConfiguration is not of type ContextConfiguration"); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + _contextLifeCycle = new ContextLifeCycle(config.Id); + _serviceInjector = serviceInjector; + _parentContext = parentContext; + try + { + _contextInjector = serviceInjector.ForkInjector(); + } + catch (Exception e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + + Optional<string> parentId = ParentContext.IsPresent() ? + Optional<string>.Of(ParentContext.Value.Id) : + Optional<string>.Empty(); + ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e); + + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); + } + // Trigger the context start events on contextInjector. + _contextLifeCycle.Start(); + } + + /// <summary> + /// Create a new ContextRuntime for the root context. + /// </summary> + /// <param name="serviceInjector"> </param> the serviceInjector to be used. + /// <param name="contextConfiguration"> the Configuration for this context.</param> + public ContextRuntime( + IInjector serviceInjector, + IConfiguration contextConfiguration) + : this(serviceInjector, contextConfiguration, Optional<ContextRuntime>.Empty()) + { + LOGGER.Log(Level.Info, "Instantiating root context"); + } + + public string Id + { + get { return _contextLifeCycle.Id; } + } + + public Optional<ContextRuntime> ParentContext + { + get { return _parentContext; } + } + + /// <summary> + /// Spawns a new context. + /// The new context will have a serviceInjector that is created by forking the one in this object with the given + /// serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector. + /// </summary> + /// <param name="contextConfiguration">the new context's context (local) Configuration.</param> + /// <param name="serviceConfiguration">the new context's service Configuration.</param> + /// <returns>a child context.</returns> + public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration, IConfiguration serviceConfiguration) + { + ContextRuntime childContext = null; + lock (_contextLifeCycle) + { + if (_task.IsPresent()) + { + var e = new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + if (_childContext.IsPresent()) + { + var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + try + { + IInjector childServiceInjector = _serviceInjector.ForkInjector(new IConfiguration[] { serviceConfiguration }); + childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this)); + _childContext = Optional<ContextRuntime>.Of(childContext); + return childContext; + } + catch (Exception e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); + + Optional<string> parentId = ParentContext.IsPresent() ? + Optional<string>.Of(ParentContext.Value.Id) : + Optional<string>.Empty(); + ContextClientCodeException ex = new ContextClientCodeException(ContextClientCodeException.GetId(contextConfiguration), parentId, "Unable to spawn context", e); + + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); + } + } + return childContext; + } + + /// <summary> + /// Spawns a new context without services of its own. + /// The new context will have a serviceInjector that is created by forking the one in this object. The + /// contextConfiguration is used to fork the contextInjector from that new serviceInjector. + /// </summary> + /// <param name="contextConfiguration">the new context's context (local) Configuration.</param> + /// <returns> a child context.</returns> + public ContextRuntime SpawnChildContext(IConfiguration contextConfiguration) + { + lock (_contextLifeCycle) + { + if (_task.IsPresent()) + { + var e = new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + if (_childContext.IsPresent()) + { + var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + IInjector childServiceInjector = _serviceInjector.ForkInjector(); + ContextRuntime childContext = new ContextRuntime(childServiceInjector, contextConfiguration, Optional<ContextRuntime>.Of(this)); + _childContext = Optional<ContextRuntime>.Of(childContext); + return childContext; + } + } + + /// <summary> + /// Launches an Task on this context. + /// </summary> + /// <param name="taskConfiguration"></param> + /// <param name="contextId"></param> + /// <param name="heartBeatManager"></param> + public void StartTask(TaskConfiguration taskConfiguration, string contextId, HeartBeatManager heartBeatManager) + { + lock (_contextLifeCycle) + { + bool taskPresent = _task.IsPresent(); + bool taskEnded = taskPresent && _task.Value.HasEnded(); + + LOGGER.Log(Level.Info, "ContextRuntime::StartTask(TaskConfiguration)" + "task is present: " + taskPresent + " task has ended: " + taskEnded); + if (taskPresent) + { + LOGGER.Log(Level.Info, "Task state: " + _task.Value.GetTaskState()); + } + + if (taskEnded) + { + // clean up state + _task = Optional<TaskRuntime>.Empty(); + taskPresent = false; + } + if (taskPresent) + { + var e = new InvalidOperationException( + string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + if (_childContext.IsPresent()) + { + var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + try + { + IInjector taskInjector = _contextInjector.ForkInjector(new IConfiguration[] { taskConfiguration.TangConfig }); + LOGGER.Log(Level.Info, "Trying to inject task with configuration" + taskConfiguration.ToString()); + TaskRuntime taskRuntime = new TaskRuntime(taskInjector, contextId, taskConfiguration.TaskId, heartBeatManager); // taskInjector.getInstance(TaskRuntime.class); + taskRuntime.Initialize(); + System.Threading.Tasks.Task.Run(new Action(taskRuntime.Start)); + _task = Optional<TaskRuntime>.Of(taskRuntime); + } + catch (Exception e) + { + var ex = new TaskClientCodeException(taskConfiguration.TaskId, Id, "Unable to instantiate the new task", e); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Task start error.", LOGGER); + } + } + } + + /// <summary> + /// Close this context. If there is a child context, this recursively closes it before closing this context. If + /// there is an Task currently running, that will be closed. + /// </summary> + public void Dispose() + { + lock (_contextLifeCycle) + { + _contextState = ContextStatusProto.State.DONE; + if (_task.IsPresent()) + { + LOGGER.Log(Level.Warning, "Shutting down an task because the underlying context is being closed."); + _task.Value.Close(null); + } + if (_childContext.IsPresent()) + { + LOGGER.Log(Level.Warning, "Closing a context because its parent context is being closed."); + _childContext.Value.Dispose(); + } + _contextLifeCycle.Close(); + if (_parentContext.IsPresent()) + { + ParentContext.Value.ResetChildContext(); + } + } + } + + /// <summary> + /// Issue a suspend call to the Task + /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING + /// in the log. + /// </summary> + /// <param name="message"> the suspend message to deliver or null if there is none.</param> + public void SuspendTask(byte[] message) + { + lock (_contextLifeCycle) + { + if (!_task.IsPresent()) + { + LOGGER.Log(Level.Warning, "Received a suspend task while there was no task running. Ignored"); + } + else + { + _task.Value.Suspend(message); + } + } + } + + /// <summary> + /// Issue a close call to the Task + /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING + /// in the log. + /// </summary> + /// <param name="message">the close message to deliver or null if there is none.</param> + public void CloseTask(byte[] message) + { + lock (_contextLifeCycle) + { + if (!_task.IsPresent()) + { + LOGGER.Log(Level.Warning, "Received a close task while there was no task running. Ignored"); + } + else + { + _task.Value.Close(message); + } + } + } + + /// <summary> + /// Deliver a message to the Task + /// Note that due to races, the task might have already ended. In that case, we drop this call and leave a WARNING + /// in the log. + /// </summary> + /// <param name="message">the message to deliver or null if there is none.</param> + public void DeliverTaskMessage(byte[] message) + { + lock (_contextLifeCycle) + { + if (!_task.IsPresent()) + { + LOGGER.Log(Level.Warning, "Received an task message while there was no task running. Ignored"); + } + else + { + _task.Value.Deliver(message); + } + } + } + + public void HandleContextMessaage(byte[] mesage) + { + _contextLifeCycle.HandleContextMessage(mesage); + } + + /// <summary> + /// get state of the running Task + /// </summary> + /// <returns> the state of the running Task, if one is running.</returns> + public Optional<TaskStatusProto> GetTaskStatus() + { + lock (_contextLifeCycle) + { + if (_task.IsPresent()) + { + if (_task.Value.HasEnded()) + { + _task = Optional<TaskRuntime>.Empty(); + return Optional<TaskStatusProto>.Empty(); + } + else + { + TaskStatusProto taskStatusProto = _task.Value.GetStatusProto(); + if (taskStatusProto.state == State.RUNNING) + { + // only RUNNING status is allowed to rurn here, all other state pushed out to heartbeat + return Optional<TaskStatusProto>.Of(taskStatusProto); + } + var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task state must be RUNNING, but instead is in {0} state", taskStatusProto.state)); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + return Optional<TaskStatusProto>.Empty(); + } + } + else + { + return Optional<TaskStatusProto>.Empty(); + } + } + } + + /// <summary> + /// Reset child context when parent is being closed + /// </summary> + public void ResetChildContext() + { + lock (_contextLifeCycle) + { + if (_childContext.IsPresent()) + { + _childContext = Optional<ContextRuntime>.Empty(); + } + else + { + var e = new InvalidOperationException("no child context set"); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); + } + } + } + + /// <summary> + /// get context's status in protocol buffer + /// </summary> + /// <returns>this context's status in protocol buffer form.</returns> + public ContextStatusProto GetContextStatus() + { + lock (_contextLifeCycle) + { + ContextStatusProto contextStatusProto = new ContextStatusProto() + { + context_id = Id, + context_state = _contextState, + }; + if (_parentContext.IsPresent()) + { + contextStatusProto.parent_id = _parentContext.Value.Id; + } + + foreach (IContextMessageSource source in _contextLifeCycle.ContextMessageSources) + { + Optional<ContextMessage> contextMessageOptional = source.Message; + if (contextMessageOptional.IsPresent()) + { + ContextStatusProto.ContextMessageProto contextMessageProto + = new ContextStatusProto.ContextMessageProto() + { + source_id = contextMessageOptional.Value.MessageSourceId, + }; + contextMessageProto.message = ByteUtilities.CopyBytesFrom(contextMessageOptional.Value.Bytes); + contextStatusProto.context_message.Add(contextMessageProto); + } + } + return contextStatusProto; + } + } + } +} + ///// <summary> + ///// TODO: remove and use parameterless GetContextStatus above + ///// </summary> + ///// <returns>this context's status in protocol buffer form.</returns> + //public ContextStatusProto GetContextStatus(string contextId) + //{ + // ContextStatusProto contextStatusProto = new ContextStatusProto() + // { + // context_id = contextId, + // context_state = _contextState, + // }; + // return contextStatusProto; + //} + + ////// TODO: remove and use injection + //public void StartTask(ITask task, HeartBeatManager heartBeatManager, string taskId, string contextId) + //{ + // lock (_contextLifeCycle) + // { + // if (_task.IsPresent() && _task.Value.HasEnded()) + // { + // // clean up state + // _task = Optional<TaskRuntime>.Empty(); + // } + // if (_task.IsPresent()) + // { + // throw new InvalidOperationException( + // string.Format(CultureInfo.InvariantCulture, "Attempting to spawn a child context when an Task with id '{0}' is running", _task.Value.TaskId)); // note: java code is putting thread id here + // } + // if (_childContext.IsPresent()) + // { + // throw new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); + // } + // try + // { + // // final Injector taskInjector = contextInjector.forkInjector(taskConfiguration); + // TaskRuntime taskRuntime // taskInjector.getInstance(TaskRuntime.class); + // = new TaskRuntime(task, heartBeatManager); + // LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Starting task '{0}'", taskId)); + // taskRuntime.Initialize(taskId, contextId); + // taskRuntime.Start(); + // _task = Optional<TaskRuntime>.Of(taskRuntime); + // } + // catch (Exception e) + // { + // throw new InvalidOperationException("Unable to instantiate the new task"); + // } + // } + //} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStartImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStartImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStartImpl.cs new file mode 100644 index 0000000..3cf31b5 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStartImpl.cs @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.Events; + +namespace Org.Apache.Reef.Common.Context +{ + class ContextStartImpl : IContextStart + { + public ContextStartImpl(string id) + { + Id = id; + } + + public string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStopImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStopImpl.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStopImpl.cs new file mode 100644 index 0000000..5db45d1 --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextStopImpl.cs @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.Events; + +namespace Org.Apache.Reef.Common.Context +{ + class ContextStopImpl : IContextStop + { + public ContextStopImpl(string id) + { + Id = id; + } + + public string Id { get; set; } + } +} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2ae282de/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/RootContextLauncher.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/RootContextLauncher.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/RootContextLauncher.cs new file mode 100644 index 0000000..d31aeed --- /dev/null +++ b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/RootContextLauncher.cs @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.Reef.Common.Evaluator.Context; +using Org.Apache.Reef.Services; +using Org.Apache.Reef.Tasks; +using Org.Apache.Reef.Utilities; +using Org.Apache.Reef.Utilities.Logging; +using Org.Apache.Reef.Tang.Implementations; +using Org.Apache.Reef.Tang.Interface; +using System; +using System.Globalization; + +namespace Org.Apache.Reef.Common.Context +{ + /// <summary> + /// Helper class that encapsulates the root context configuration: With or without services and an initial task. + /// </summary> + public sealed class RootContextLauncher + { + private static readonly Logger LOGGER = Logger.GetLogger(typeof(RootContextLauncher)); + + private readonly IInjector _rootServiceInjector = null; + + private ContextRuntime _rootContext = null; + + private ContextConfiguration _rootContextConfiguration = null; + + public RootContextLauncher(ContextConfiguration rootContextConfig, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig) + { + _rootContextConfiguration = rootContextConfig; + _rootServiceInjector = InjectServices(rootServiceConfig); + RootTaskConfig = rootTaskConfig; + } + + public Optional<TaskConfiguration> RootTaskConfig { get; set; } + + public ContextConfiguration RootContextConfig + { + get { return _rootContextConfiguration; } + set { _rootContextConfiguration = value; } + } + + public ContextRuntime GetRootContext() + { + if (_rootContext == null) + { + _rootContext = GetRootContext(_rootServiceInjector, _rootContextConfiguration); + } + return _rootContext; + } + + private IInjector InjectServices(Optional<ServiceConfiguration> serviceConfig) + { + IInjector rootServiceInjector; + + if (serviceConfig.IsPresent()) + { + rootServiceInjector = TangFactory.GetTang().NewInjector(serviceConfig.Value.TangConfig); + InjectedServices services = null; + try + { + services = rootServiceInjector.GetInstance<InjectedServices>(); + } + catch (Exception e) + { + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Failed to instantiate service.", LOGGER); + InvalidOperationException ex = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Failed to inject service: encoutned error {1} with message [{0}] and stack trace:[{1}]", e, e.Message, e.StackTrace)); + Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); + } + LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "injected {0} service(s)", services.Services.Count)); + } + else + { + rootServiceInjector = TangFactory.GetTang().NewInjector(); + LOGGER.Log(Level.Info, "no service provided for injection."); + } + + return rootServiceInjector; + } + + private ContextRuntime GetRootContext( + IInjector rootServiceInjector, + IConfiguration rootContextConfiguration) + { + ContextRuntime result; + result = new ContextRuntime(rootServiceInjector, rootContextConfiguration); + return result; + } + } +} +//if (rootServiceInjector != null) +//{ +// try +// { +// rootServiceInjector = rootServiceInjector.ForkInjector(serviceConfigs); +// } +// catch (Exception e) +// { +// throw new ContextClientCodeException(ContextClientCodeException.GetId(rootContextConfiguration), +// Optional<String>.Empty(), +// "Unable to instatiate the root context", e); +// } +// result = new ContextRuntime(rootServiceInjector, rootContextConfiguration); +//} +//else +//{ +// result = new ContextRuntime(rootServiceInjector.ForkInjector(), rootContextConfiguration); +//} \ No newline at end of file
