http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs deleted file mode 100644 index 052764d..0000000 --- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorRuntime.cs +++ /dev/null @@ -1,255 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Common.Context; -using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto; -using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol; -using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto; -using Org.Apache.Reef.Evaluator; -using Org.Apache.Reef.Utilities; -using Org.Apache.Reef.Utilities.Logging; -using Org.Apache.Reef.Tang.Annotations; -using Org.Apache.Reef.Tang.Interface; -using Org.Apache.Reef.Wake.Remote; -using Org.Apache.Reef.Wake.Time; -using Org.Apache.Reef.Wake.Time.Runtime.Event; -using System; -using System.Globalization; - -namespace Org.Apache.Reef.Common -{ - public class EvaluatorRuntime : IObserver<RuntimeStart>, IObserver<RuntimeStop>, IObserver<REEFMessage> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorRuntime)); - - private readonly string _evaluatorId; - - private readonly ContextManager _contextManager; - - private readonly HeartBeatManager _heartBeatManager; - - private readonly IRemoteManager<REEFMessage> _remoteManager; - - private readonly IClock _clock; - - private State _state = State.INIT; - - private IDisposable _evaluatorControlChannel; - - [Inject] - public EvaluatorRuntime( - ContextManager contextManager, - HeartBeatManager heartBeatManager) - { - using (LOGGER.LogFunction("EvaluatorRuntime::EvaluatorRuntime")) - { - _clock = heartBeatManager.EvaluatorSettings.RuntimeClock; - _heartBeatManager = heartBeatManager; - _contextManager = contextManager; - _evaluatorId = heartBeatManager.EvaluatorSettings.EvalutorId; - _remoteManager = heartBeatManager.EvaluatorSettings.RemoteManager; - - ReefMessageProtoObserver driverObserver = new ReefMessageProtoObserver(); - - // subscribe to driver proto message - driverObserver.Subscribe(o => OnNext(o.Message)); - - // register the driver observer - _evaluatorControlChannel = _remoteManager.RegisterObserver(driverObserver); - - // start the hearbeat - _clock.ScheduleAlarm(0, heartBeatManager); - } - } - - public State State - { - get - { - return _state; - } - } - - public void Handle(EvaluatorControlProto message) - { - lock (_heartBeatManager) - { - LOGGER.Log(Level.Info, "Handle Evaluator control message"); - if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase)) - { - Handle(new InvalidOperationException( - string.Format(CultureInfo.InvariantCulture, "Identifier mismatch: message for evaluator id[{0}] sent to evaluator id[{1}]", message.identifier, _evaluatorId))); - } - else if (_state != State.RUNNING) - { - Handle(new InvalidOperationException( - string.Format(CultureInfo.InvariantCulture, "Evaluator received a control message but its state is not {0} but rather {1}", State.RUNNING, _state))); - } - else - { - if (message.context_control != null) - { - LOGGER.Log(Level.Info, "Send task control message to ContextManager"); - try - { - _contextManager.HandleTaskControl(message.context_control); - if (_contextManager.ContextStackIsEmpty() && _state == State.RUNNING) - { - LOGGER.Log(Level.Info, "Context stack is empty, done"); - _state = State.DONE; - _heartBeatManager.OnNext(GetEvaluatorStatus()); - _clock.Dispose(); - } - } - catch (Exception e) - { - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); - Handle(e); - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(e.ToString(), e), LOGGER); - } - } - if (message.kill_evaluator != null) - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by the driver.", _evaluatorId)); - _state = State.KILLED; - _clock.Dispose(); - } - } - } - } - - public EvaluatorStatusProto GetEvaluatorStatus() - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state : {0}", _state)); - EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto() - { - evaluator_id = _evaluatorId, - state = _state - }; - return evaluatorStatusProto; - } - - public void OnNext(RuntimeStart runtimeStart) - { - lock (_evaluatorId) - { - try - { - LOGGER.Log(Level.Info, "Runtime start"); - if (_state != State.INIT) - { - var e = new InvalidOperationException("State should be init."); - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - _state = State.RUNNING; - _contextManager.Start(); - _heartBeatManager.OnNext(); - } - catch (Exception e) - { - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); - Handle(e); - } - } - } - - void IObserver<RuntimeStart>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<REEFMessage>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<REEFMessage>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStop>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStop>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStart>.OnCompleted() - { - throw new NotImplementedException(); - } - - public void OnNext(RuntimeStop runtimeStop) - { - LOGGER.Log(Level.Info, "Runtime stop"); - _contextManager.Dispose(); - - if (_state == State.RUNNING) - { - _state = State.DONE; - _heartBeatManager.OnNext(); - } - try - { - _evaluatorControlChannel.Dispose(); - } - catch (Exception e) - { - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, "Exception during shut down.", LOGGER); - } - LOGGER.Log(Level.Info, "EvaluatorRuntime shutdown complete"); - } - - public void OnNext(REEFMessage value) - { - if (value != null && value.evaluatorControl != null) - { - LOGGER.Log(Level.Info, "Received a REEFMessage with EvaluatorControl"); - Handle(value.evaluatorControl); - } - } - - private void Handle(Exception e) - { - lock (_heartBeatManager) - { - LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with exception", _evaluatorId), e); - _state = State.FAILED; - string errorMessage = string.Format( - CultureInfo.InvariantCulture, - "failed with error [{0}] with mesage [{1}] and stack trace [{2}]", - e, - e.Message, - e.StackTrace); - EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto() - { - evaluator_id = _evaluatorId, - error = ByteUtilities.StringToByteArrays(errorMessage), - state = _state - }; - _heartBeatManager.OnNext(evaluatorStatusProto); - _contextManager.Dispose(); - } - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs deleted file mode 100644 index 067a0a0..0000000 --- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/EvaluatorSettings.cs +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Common.Evaluator; -using Org.Apache.Reef.Common.Evaluator.Context; -using Org.Apache.Reef.Common.io; -using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol; -using Org.Apache.Reef.Tang.Interface; -using Org.Apache.Reef.Wake.Remote; -using Org.Apache.Reef.Wake.Time; -using System; - -namespace Org.Apache.Reef.Evaluator -{ - // TODO: merge with EvaluatorConfigurations class - public class EvaluatorSettings - { - private string _applicationId; - - private string _evaluatorId; - - private int _heartBeatPeriodInMs; - - private int _maxHeartbeatRetries; - - private ContextConfiguration _rootContextConfig; - - private IClock _clock; - - private IRemoteManager<REEFMessage> _remoteManager; - - private IInjector _injector; - - private EvaluatorOperationState _operationState; - - private INameClient _nameClient; - - public EvaluatorSettings( - string applicationId, - string evaluatorId, - int heartbeatPeriodInMs, - int maxHeartbeatRetries, - ContextConfiguration rootContextConfig, - IClock clock, - IRemoteManager<REEFMessage> remoteManager, - IInjector injecor) - { - if (string.IsNullOrWhiteSpace(evaluatorId)) - { - throw new ArgumentNullException("evaluatorId"); - } - if (rootContextConfig == null) - { - throw new ArgumentNullException("rootContextConfig"); - } - if (clock == null) - { - throw new ArgumentNullException("clock"); - } - if (remoteManager == null) - { - throw new ArgumentNullException("remoteManager"); - } - if (injecor == null) - { - throw new ArgumentNullException("injecor"); - } - _applicationId = applicationId; - _evaluatorId = evaluatorId; - _heartBeatPeriodInMs = heartbeatPeriodInMs; - _maxHeartbeatRetries = maxHeartbeatRetries; - _rootContextConfig = rootContextConfig; - _clock = clock; - _remoteManager = remoteManager; - _injector = injecor; - _operationState = EvaluatorOperationState.OPERATIONAL; - } - - public EvaluatorOperationState OperationState - { - get - { - return _operationState; - } - - set - { - _operationState = value; - } - } - - public string EvalutorId - { - get - { - return _evaluatorId; - } - } - - public int HeartBeatPeriodInMs - { - get - { - return _heartBeatPeriodInMs; - } - } - - public string ApplicationId - { - get - { - return _applicationId; - } - } - - public int MaxHeartbeatFailures - { - get - { - return _maxHeartbeatRetries; - } - } - - public ContextConfiguration RootContextConfig - { - get - { - return _rootContextConfig; - } - } - - public IClock RuntimeClock - { - get - { - return _clock; - } - } - - public INameClient NameClient - { - get - { - return _nameClient; - } - - set - { - _nameClient = value; - } - } - - public IRemoteManager<REEFMessage> RemoteManager - { - get - { - return _remoteManager; - } - } - - public IInjector Injector - { - get - { - return _injector; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs deleted file mode 100644 index e495fda..0000000 --- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/HeartBeatManager.cs +++ /dev/null @@ -1,390 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Common.Context; -using Org.Apache.Reef.Common.Evaluator; -using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto; -using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol; -using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto; -using Org.Apache.Reef.Common.Runtime; -using Org.Apache.Reef.Evaluator; -using Org.Apache.Reef.Utilities; -using Org.Apache.Reef.Utilities.Logging; -using Org.Apache.Reef.Wake.Remote; -using Org.Apache.Reef.Wake.Remote.Impl; -using Org.Apache.Reef.Wake.Time; -using System; -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using System.Globalization; -using System.Linq; -using System.Net; -using System.Threading; - -namespace Org.Apache.Reef.Common -{ - public class HeartBeatManager : IObserver<Alarm> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(HeartBeatManager)); - - private static readonly MachineStatus MachineStatus = new MachineStatus(); - - private readonly IRemoteManager<REEFMessage> _remoteManager; - - private readonly IClock _clock; - - private readonly int _heartBeatPeriodInMillSeconds; - - private readonly int _maxHeartbeatRetries = 0; - - private readonly string _evaluatorId; - - private IRemoteIdentifier _remoteId; - - private IObserver<REEFMessage> _observer; - - private int _heartbeatFailures = 0; - - private IDriverConnection _driverConnection; - - private EvaluatorSettings _evaluatorSettings; - - // the queue can only contains the following: - // 1. all failed heartbeats (regular and event-based) before entering RECOVERY state - // 2. event-based heartbeats generated in RECOVERY state (since there will be no attempt to send regular heartbeat) - private Queue<EvaluatorHeartbeatProto> _queuedHeartbeats = new Queue<EvaluatorHeartbeatProto>(); - - public HeartBeatManager(EvaluatorSettings settings, IRemoteIdentifier remoteId) - { - using (LOGGER.LogFunction("HeartBeatManager::HeartBeatManager")) - { - _remoteManager = settings.RemoteManager; - _remoteId = remoteId; - _evaluatorId = settings.EvalutorId; - _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId)); - _clock = settings.RuntimeClock; - _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs; - _maxHeartbeatRetries = settings.MaxHeartbeatFailures; - EvaluatorSettings = settings; - MachineStatus.ToString(); // kick start the CPU perf counter - } - } - - [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")] - public EvaluatorRuntime _evaluatorRuntime { get; set; } - - [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")] - public ContextManager _contextManager { get; set; } - - public EvaluatorSettings EvaluatorSettings - { - get - { - return _evaluatorSettings; - } - - private set - { - _evaluatorSettings = value; - } - } - - public void Send(EvaluatorHeartbeatProto evaluatorHeartbeatProto) - { - lock (_queuedHeartbeats) - { - if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY) - { - LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "In RECOVERY mode, heartbeat queued as [{0}]. ", evaluatorHeartbeatProto)); - _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto); - return; - } - - // NOT during recovery, try to send - REEFMessage payload = new REEFMessage(evaluatorHeartbeatProto); - try - { - _observer.OnNext(payload); - _heartbeatFailures = 0; // reset failure counts if we are having intermidtten (not continuous) failures - } - catch (Exception e) - { - if (evaluatorHeartbeatProto.task_status == null || evaluatorHeartbeatProto.task_status.state != State.RUNNING) - { - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications to driver when no task is running, recovery NOT supported for such scenario", LOGGER); - } - - _heartbeatFailures++; - - _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto); - LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e); - - if (_heartbeatFailures >= _maxHeartbeatRetries) - { - LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Heartbeat communications to driver reached max of {0} failures. \n==== Driver is considered dead/unreachable. === \n=========== Entering RECOVERY mode. ===========", _heartbeatFailures)); - try - { - _driverConnection = _evaluatorSettings.Injector.GetInstance<IDriverConnection>(); - } - catch (Exception ex) - { - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Failed to inject the driver reconnect implementation", LOGGER); - } - LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection); - _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY; - - // clean heartbeat failure - _heartbeatFailures = 0; - } - } - } - } - - /// <summary> - /// Assemble a complete new heartbeat and send it out. - /// </summary> - public void OnNext() - { - LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext()"); - lock (this) - { - LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext()"); - EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(); - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); - Send(heartbeatProto); - } - } - - /// <summary> - /// Called with a specific TaskStatus that must be delivered to the driver - /// </summary> - /// <param name="taskStatusProto"></param> - public void OnNext(TaskStatusProto taskStatusProto) - { - LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(TaskStatusProto)"); - lock (this) - { - LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(TaskStatusProto)"); - EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto( - _evaluatorRuntime.GetEvaluatorStatus(), - _contextManager.GetContextStatusCollection(), - Optional<TaskStatusProto>.Of(taskStatusProto)); - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); - Send(heartbeatProto); - } - } - - /// <summary> - /// Called with a specific ContextStatusProto that must be delivered to the driver - /// </summary> - /// <param name="contextStatusProto"></param> - public void OnNext(ContextStatusProto contextStatusProto) - { - LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(ContextStatusProto)"); - lock (this) - { - LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(ContextStatusProto)"); - List<ContextStatusProto> contextStatusProtos = new List<ContextStatusProto>(); - contextStatusProtos.Add(contextStatusProto); - contextStatusProtos.AddRange(_contextManager.GetContextStatusCollection()); - EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto( - _evaluatorRuntime.GetEvaluatorStatus(), - contextStatusProtos, - Optional<TaskStatusProto>.Empty()); - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); - Send(heartbeatProto); - } - } - - /// <summary> - /// Called with a specific EvaluatorStatus that must be delivered to the driver - /// </summary> - /// <param name="evaluatorStatusProto"></param> - public void OnNext(EvaluatorStatusProto evaluatorStatusProto) - { - LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(EvaluatorStatusProto)"); - lock (this) - { - LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(EvaluatorStatusProto)"); - EvaluatorHeartbeatProto heartbeatProto = new EvaluatorHeartbeatProto() - { - timestamp = CurrentTimeMilliSeconds(), - evaluator_status = evaluatorStatusProto - }; - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); - Send(heartbeatProto); - } - } - - public void OnNext(Alarm value) - { - LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(Alarm)"); - lock (this) - { - LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(Alarm)"); - if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && _evaluatorRuntime.State == State.RUNNING) - { - EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto(); - LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString())); - Send(evaluatorHeartbeatProto); - _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this); - } - else - { - LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", _evaluatorSettings.OperationState, _evaluatorRuntime.State)); - try - { - DriverInformation driverInformation = _driverConnection.GetDriverInformation(_evaluatorSettings.ApplicationId); - if (driverInformation == null) - { - LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later."); - } - else - { - LOGGER.Log( - Level.Info, - string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId)); - Recover(driverInformation); - } - } - catch (Exception e) - { - // we do not want any exception to stop the query for driver status - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, LOGGER); - } - _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this); - } - } - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - - private static long CurrentTimeMilliSeconds() - { - // this is an implmenation to get current time milli second counted from Jan 1st, 1970 - // it is chose as such to be compatible with java implmentation - DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); - return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds; - } - - private void Recover(DriverInformation driverInformation) - { - IPEndPoint driverEndpoint = NetUtilities.ParseIpEndpoint(driverInformation.DriverRemoteIdentifier); - _remoteId = new SocketRemoteIdentifier(driverEndpoint); - _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId)); - lock (_evaluatorSettings) - { - if (_evaluatorSettings.NameClient != null) - { - try - { - LOGGER.Log(Level.Verbose, "Trying to reset and reconnect to name server" + driverInformation.NameServerId); - _evaluatorSettings.NameClient.Restart(NetUtilities.ParseIpEndpoint(driverInformation.NameServerId)); - LOGGER.Log(Level.Info, "Reconnected to name server: " + driverInformation.NameServerId); - } - catch (Exception e) - { - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); - } - } - } - - lock (_queuedHeartbeats) - { - bool firstHeartbeatInQueue = true; - while (_queuedHeartbeats.Any()) - { - LOGGER.Log(Level.Info, "Sending cached recovery heartbeats to " + _remoteId); - try - { - if (firstHeartbeatInQueue) - { - // first heartbeat is specially construted to include the recovery flag - EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue()); - LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat); - _observer.OnNext(new REEFMessage(recoveryHeartbeat)); - firstHeartbeatInQueue = false; - } - else - { - _observer.OnNext(new REEFMessage(_queuedHeartbeats.Dequeue())); - } - } - catch (Exception e) - { - // we do not handle failures during RECOVERY - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow( - e, - Level.Error, - string.Format(CultureInfo.InvariantCulture, "Hearbeat attempt failed in RECOVERY mode to Driver {0} , giving up...", _remoteId), - LOGGER); - } - Thread.Sleep(500); - } - } - _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL; - LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ==========="); - } - - private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat) - { - heartbeat.recovery = true; - heartbeat.context_status.ForEach(c => c.recovery = true); - heartbeat.task_status.recovery = true; - return heartbeat; - } - - private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto() - { - return GetEvaluatorHeartbeatProto( - _evaluatorRuntime.GetEvaluatorStatus(), - _contextManager.GetContextStatusCollection(), - _contextManager.GetTaskStatus()); - } - - private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto( - EvaluatorStatusProto evaluatorStatusProto, - ICollection<ContextStatusProto> contextStatusProtos, - Optional<TaskStatusProto> taskStatusProto) - { - EvaluatorHeartbeatProto evaluatorHeartbeatProto = new EvaluatorHeartbeatProto() - { - timestamp = CurrentTimeMilliSeconds(), - evaluator_status = evaluatorStatusProto - }; - foreach (ContextStatusProto contextStatusProto in contextStatusProtos) - { - evaluatorHeartbeatProto.context_status.Add(contextStatusProto); - } - if (taskStatusProto.IsPresent()) - { - evaluatorHeartbeatProto.task_status = taskStatusProto.Value; - } - return evaluatorHeartbeatProto; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs deleted file mode 100644 index 8a7aa94..0000000 --- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/ReefMessageProtoObserver.cs +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Common.ProtoBuf.ReefProtocol; -using Org.Apache.Reef.Utilities; -using Org.Apache.Reef.Utilities.Logging; -using Org.Apache.Reef.Wake.Remote; -using System; -using System.Globalization; -using System.Threading; - -namespace Org.Apache.Reef.Common -{ - public class ReefMessageProtoObserver : - IObserver<IRemoteMessage<REEFMessage>>, - IObservable<IRemoteMessage<REEFMessage>>, - IDisposable - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(ReefMessageProtoObserver)); - private volatile IObserver<IRemoteMessage<REEFMessage>> _observer = null; - private long _count = 0; - private DateTime _begin; - private DateTime _origBegin; - - public void OnCompleted() - { - } - - public void OnError(Exception error) - { - } - - public void OnNext(IRemoteMessage<REEFMessage> value) - { - REEFMessage remoteEvent = value.Message; - IRemoteIdentifier id = value.Identifier; - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "receive a ReefMessage from {0} Driver at {1}.", remoteEvent, id)); - - if (remoteEvent.evaluatorControl != null) - { - if (remoteEvent.evaluatorControl.context_control != null) - { - string context_message = null; - string task_message = null; - - if (remoteEvent.evaluatorControl.context_control.context_message != null) - { - context_message = remoteEvent.evaluatorControl.context_control.context_message.ToString(); - } - if (remoteEvent.evaluatorControl.context_control.task_message != null) - { - task_message = ByteUtilities.ByteArrarysToString(remoteEvent.evaluatorControl.context_control.task_message); - } - - if (!(string.IsNullOrEmpty(context_message) && string.IsNullOrEmpty(task_message))) - { - LOGGER.Log(Level.Info, - string.Format(CultureInfo.InvariantCulture, "Control protobuf with context message [{0}] and task message [{1}]", context_message, task_message)); - } - else if (remoteEvent.evaluatorControl.context_control.remove_context != null) - { - LOGGER.Log(Level.Info, - string.Format(CultureInfo.InvariantCulture, "Control protobuf to remove context {0}", remoteEvent.evaluatorControl.context_control.remove_context.context_id)); - } - else if (remoteEvent.evaluatorControl.context_control.add_context != null) - { - LOGGER.Log(Level.Info, - string.Format(CultureInfo.InvariantCulture, "Control protobuf to add a context on top of {0}", remoteEvent.evaluatorControl.context_control.add_context.parent_context_id)); - } - else if (remoteEvent.evaluatorControl.context_control.start_task != null) - { - LOGGER.Log(Level.Info, - string.Format(CultureInfo.InvariantCulture, "Control protobuf to start an task in {0}", remoteEvent.evaluatorControl.context_control.start_task.context_id)); - } - else if (remoteEvent.evaluatorControl.context_control.stop_task != null) - { - LOGGER.Log(Level.Info, "Control protobuf to stop task"); - } - else if (remoteEvent.evaluatorControl.context_control.suspend_task != null) - { - LOGGER.Log(Level.Info, "Control protobuf to suspend task"); - } - } - } - if (_count == 0) - { - _begin = DateTime.Now; - _origBegin = _begin; - } - var count = Interlocked.Increment(ref _count); - - int printBatchSize = 100000; - if (count % printBatchSize == 0) - { - DateTime end = DateTime.Now; - var diff = (end - _begin).TotalMilliseconds; - double seconds = diff / 1000.0; - long eventsPerSecond = (long)(printBatchSize / seconds); - _begin = DateTime.Now; - } - - var observer = _observer; - if (observer != null) - { - observer.OnNext(value); - } - } - - public IDisposable Subscribe(IObserver<IRemoteMessage<REEFMessage>> observer) - { - if (_observer != null) - { - return null; - } - _observer = observer; - return this; - } - - public void Dispose() - { - _observer = null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs deleted file mode 100644 index 31194a7..0000000 --- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextClientCodeException.cs +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Utilities; -using Org.Apache.Reef.Tang.Interface; -using System; - -namespace Org.Apache.Reef.Common.Context -{ - public class ContextClientCodeException : Exception - { - private readonly string _contextId; - private readonly Optional<string> _parentId; - - /// <summary> - /// construt the exception that caused the error - /// </summary> - /// <param name="contextId"> the id of the failed context.</param> - /// <param name="parentId"> the id of the failed context's parent, if any.</param> - /// <param name="message"> the error message </param> - /// <param name="cause"> the exception that caused the error</param> - public ContextClientCodeException( - string contextId, - Optional<string> parentId, - string message, - Exception cause) - : base("Failure in context '" + contextId + "': " + message, cause) - { - _contextId = contextId; - _parentId = parentId; - } - - public string ContextId - { - get { return _contextId; } - } - - public Optional<string> ParentId - { - get { return _parentId; } - } - - /// <summary> - /// Extracts a context id from the given configuration. - /// </summary> - /// <param name="c"></param> - /// <returns>the context id in the given configuration.</returns> - public static string GetId(IConfiguration c) - { - // TODO: update after TANG is available - return string.Empty; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs deleted file mode 100644 index ca6b949..0000000 --- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextConfiguration.cs +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Utilities.Logging; -using Org.Apache.Reef.Tang.Formats; -using Org.Apache.Reef.Tang.Interface; -using System; -using System.Collections.Generic; -using System.Globalization; -using System.IO; -using Org.Apache.Reef.Tang.Types; - -namespace Org.Apache.Reef.Common.Evaluator.Context -{ - public class ContextConfiguration : IConfiguration - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextConfiguration)); - - private Dictionary<string, string> _settings; - - public ContextConfiguration(string configString) - { - using (LOGGER.LogFunction("ContextConfiguration::ContextConfigurationn")) - { - ContainerDirectory = Directory.GetCurrentDirectory(); - - _settings = new Dictionary<string, string>(); - AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString); - foreach (ConfigurationEntry config in avroConfiguration.Bindings) - { - if (config.key.Contains(Reef.Evaluator.Constants.ContextIdentifier)) - { - config.key = Reef.Evaluator.Constants.ContextIdentifier; - LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "{0} detected for context id with value {1}", config.key, config.value)); - } - _settings.Add(config.key, config.value); - } - if (!_settings.ContainsKey(Reef.Evaluator.Constants.ContextIdentifier)) - { - string msg = "Required parameter ContextIdentifier not provided."; - LOGGER.Log(Level.Error, msg); - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER); - } - } - } - - public string Id - { - get { return _settings[Reef.Evaluator.Constants.ContextIdentifier]; } - } - - public string ContainerDirectory { get; set; } - - public IConfigurationBuilder newBuilder() - { - throw new NotImplementedException(); - } - - public string GetNamedParameter(INamedParameterNode np) - { - throw new NotImplementedException(); - } - - public IClassHierarchy GetClassHierarchy() - { - throw new NotImplementedException(); - } - - public ISet<object> GetBoundSet(INamedParameterNode np) - { - throw new NotImplementedException(); - } - - public IClassNode GetBoundConstructor(IClassNode cn) - { - throw new NotImplementedException(); - } - - public IClassNode GetBoundImplementation(IClassNode cn) - { - throw new NotImplementedException(); - } - - public IConstructorDef GetLegacyConstructor(IClassNode cn) - { - throw new NotImplementedException(); - } - - public ICollection<IClassNode> GetBoundImplementations() - { - throw new NotImplementedException(); - } - - public ICollection<IClassNode> GetBoundConstructors() - { - throw new NotImplementedException(); - } - - public ICollection<INamedParameterNode> GetNamedParameters() - { - throw new NotImplementedException(); - } - - public ICollection<IClassNode> GetLegacyConstructors() - { - throw new NotImplementedException(); - } - - public IList<object> GetBoundList(INamedParameterNode np) - { - throw new NotImplementedException(); - } - - public IEnumerator<KeyValuePair<INamedParameterNode, object>> GetBoundSets() - { - throw new NotImplementedException(); - } - - public IDictionary<INamedParameterNode, IList<object>> GetBoundList() - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs deleted file mode 100644 index 9967258..0000000 --- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextLifeCycle.cs +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Common.Events; -using System; -using System.Collections.Generic; - -namespace Org.Apache.Reef.Common.Context -{ - /// <summary> - /// This class is used to trigger all the context life-cycle dependent events. - /// </summary> - class ContextLifeCycle - { - private HashSet<IObserver<IContextStart>> _contextStartHandlers; - - private HashSet<IObserver<IContextStop>> _contextStopHandlers; - - private HashSet<IContextMessageSource> _contextMessageSources; - - // @Inject - public ContextLifeCycle( - string id, - HashSet<IObserver<IContextStart>> contextStartHandlers, - HashSet<IObserver<IContextStop>> contextStopHandlers, - HashSet<IContextMessageSource> contextMessageSources) - { - Id = id; - _contextStartHandlers = contextStartHandlers; - _contextStopHandlers = contextStopHandlers; - _contextMessageSources = contextMessageSources; - } - - public ContextLifeCycle(string contextId) - { - Id = contextId; - _contextStartHandlers = new HashSet<IObserver<IContextStart>>(); - _contextStopHandlers = new HashSet<IObserver<IContextStop>>(); - _contextMessageSources = new HashSet<IContextMessageSource>(); - } - - public string Id { get; private set; } - - public HashSet<IContextMessageSource> ContextMessageSources - { - get { return _contextMessageSources; } - } - - /// <summary> - /// Fires ContextStart to all registered event handlers. - /// </summary> - public void Start() - { - IContextStart contextStart = new ContextStartImpl(Id); - - // TODO: enable - //foreach (IObserver<IContextStart> startHandler in _contextStartHandlers) - //{ - // startHandler.OnNext(contextStart); - //} - } - - /// <summary> - /// Fires ContextStop to all registered event handlers. - /// </summary> - public void Close() - { - //IContextStop contextStop = new ContextStopImpl(Id); - //foreach (IObserver<IContextStop> startHandler in _contextStopHandlers) - //{ - // startHandler.OnNext(contextStop); - //} - } - - public void HandleContextMessage(byte[] message) - { - //contextMessageHandler.onNext(message); - } - - /// <summary> - /// get the set of ContextMessageSources configured - /// </summary> - /// <returns>(a shallow copy of) the set of ContextMessageSources configured.</returns> - public HashSet<IContextMessageSource> GetContextMessageSources() - { - return new HashSet<IContextMessageSource>(_contextMessageSources); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c1b5200f/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs b/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs deleted file mode 100644 index 15b09b9..0000000 --- a/lang/cs/Source/REEF/reef-common/ReefCommon/runtime/evaluator/context/ContextManager.cs +++ /dev/null @@ -1,362 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.Reef.Common.Evaluator.Context; -using Org.Apache.Reef.Common.ProtoBuf.EvaluatorRunTimeProto; -using Org.Apache.Reef.Common.ProtoBuf.ReefServiceProto; -using Org.Apache.Reef.Common.Task; -using Org.Apache.Reef.Evaluator; -using Org.Apache.Reef.Services; -using Org.Apache.Reef.Tasks; -using Org.Apache.Reef.Utilities; -using Org.Apache.Reef.Utilities.Logging; -using System; -using System.Collections.Generic; -using System.Collections.ObjectModel; -using System.Globalization; -using System.Linq; - -namespace Org.Apache.Reef.Common.Context -{ - public class ContextManager : IDisposable - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextManager)); - - private readonly Stack<ContextRuntime> _contextStack = new Stack<ContextRuntime>(); - - private readonly HeartBeatManager _heartBeatManager; - - private RootContextLauncher _rootContextLauncher; - - public ContextManager(HeartBeatManager heartBeatManager, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig) - { - using (LOGGER.LogFunction("ContextManager::ContextManager")) - { - _heartBeatManager = heartBeatManager; - _rootContextLauncher = new RootContextLauncher(_heartBeatManager.EvaluatorSettings.RootContextConfig, rootServiceConfig, rootTaskConfig); - } - } - - /// <summary> - /// Start the context manager. This initiates the root context. - /// </summary> - public void Start() - { - lock (_contextStack) - { - ContextRuntime rootContext = _rootContextLauncher.GetRootContext(); - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id {0}", rootContext.Id)); - _contextStack.Push(rootContext); - - if (_rootContextLauncher.RootTaskConfig.IsPresent()) - { - LOGGER.Log(Level.Info, "Launching the initial Task"); - try - { - _contextStack.Peek().StartTask(_rootContextLauncher.RootTaskConfig.Value, _rootContextLauncher.RootContextConfig.Id, _heartBeatManager); - } - catch (TaskClientCodeException e) - { - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Exception when trying to start a task.", LOGGER); - HandleTaskException(e); - } - } - } - } - - public bool ContextStackIsEmpty() - { - lock (_contextStack) - { - return (_contextStack.Count == 0); - } - } - - // TODO: codes here are slightly different from java since the protobuf.net does not generate the HasXXX method, may want to switch to proto-port later - - /// <summary> - /// Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts. - /// This also triggers the HeartBeatManager to send a heartbeat with the result of this operation. - /// </summary> - /// <param name="controlMessage"></param> - public void HandleTaskControl(ContextControlProto controlMessage) - { - try - { - byte[] message = controlMessage.task_message; - if (controlMessage.add_context != null && controlMessage.remove_context != null) - { - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Received a message with both add and remove context. This is unsupported."), LOGGER); - } - if (controlMessage.add_context != null) - { - LOGGER.Log(Level.Info, "AddContext"); - AddContext(controlMessage.add_context); - // support submitContextAndTask() - if (controlMessage.start_task != null) - { - LOGGER.Log(Level.Info, "StartTask"); - StartTask(controlMessage.start_task); - } - else - { - // We need to trigger a heartbeat here. In other cases, the heartbeat will be triggered by the TaskRuntime - // Therefore this call can not go into addContext - LOGGER.Log(Level.Info, "Trigger Heartbeat"); - _heartBeatManager.OnNext(); - } - } - else if (controlMessage.remove_context != null) - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "RemoveContext with id {0}", controlMessage.remove_context.context_id)); - RemoveContext(controlMessage.remove_context.context_id); - } - else if (controlMessage.start_task != null) - { - LOGGER.Log(Level.Info, "StartTask only"); - StartTask(controlMessage.start_task); - } - else if (controlMessage.stop_task != null) - { - LOGGER.Log(Level.Info, "CloseTask"); - _contextStack.Peek().CloseTask(message); - } - else if (controlMessage.suspend_task != null) - { - LOGGER.Log(Level.Info, "SuspendTask"); - _contextStack.Peek().SuspendTask(message); - } - else if (controlMessage.task_message != null) - { - LOGGER.Log(Level.Info, "DeliverTaskMessage"); - _contextStack.Peek().DeliverTaskMessage(message); - } - else if (controlMessage.context_message != null) - { - LOGGER.Log(Level.Info, "Handle context contol message"); - ContextMessageProto contextMessageProto = controlMessage.context_message; - bool deliveredMessage = false; - foreach (ContextRuntime context in _contextStack) - { - if (context.Id.Equals(contextMessageProto.context_id)) - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Handle context message {0}", controlMessage.context_message.message)); - context.HandleContextMessaage(controlMessage.context_message.message); - deliveredMessage = true; - break; - } - } - if (!deliveredMessage) - { - InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent message to unknown context {0}", contextMessageProto.context_id)); - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - } - else - { - InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown task control message: {0}", controlMessage.ToString())); - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - } - catch (Exception e) - { - if (e is TaskClientCodeException) - { - HandleTaskException((TaskClientCodeException)e); - } - else if (e is ContextClientCodeException) - { - HandlContextException((ContextClientCodeException)e); - } - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, LOGGER); - } - } - - /// <summary> - /// Get TaskStatusProto of the currently running task, if there is any - /// </summary> - /// <returns>the TaskStatusProto of the currently running task, if there is any</returns> - public Optional<TaskStatusProto> GetTaskStatus() - { - if (_contextStack.Count == 0) - { - return Optional<TaskStatusProto>.Empty(); - - //throw new InvalidOperationException("Asked for an Task status while there isn't even a context running."); - } - return _contextStack.Peek().GetTaskStatus(); - } - - /// <summary> - /// get status of all contexts in the stack. - /// </summary> - /// <returns>the status of all contexts in the stack.</returns> - public ICollection<ContextStatusProto> GetContextStatusCollection() - { - ICollection<ContextStatusProto> result = new Collection<ContextStatusProto>(); - foreach (ContextRuntime runtime in _contextStack) - { - ContextStatusProto contextStatusProto = runtime.GetContextStatus(); - LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", contextStatusProto)); - result.Add(contextStatusProto); - } - return result; - } - - /// <summary> - /// Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack, - /// starting at the top. - /// </summary> - public void Dispose() - { - lock (_contextStack) - { - if (_contextStack != null && _contextStack.Any()) - { - LOGGER.Log(Level.Info, "context stack not empty, forcefully closing context runtime."); - ContextRuntime runtime = _contextStack.Last(); - if (runtime != null) - { - runtime.Dispose(); - } - } - } - } - - /// <summary> - /// Add a context to the stack. - /// </summary> - /// <param name="addContextProto"></param> - private void AddContext(AddContextProto addContextProto) - { - lock (_contextStack) - { - ContextRuntime currentTopContext = _contextStack.Peek(); - if (!currentTopContext.Id.Equals(addContextProto.parent_context_id, StringComparison.OrdinalIgnoreCase)) - { - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to instantiate a child context on context with id '{0}' while the current top context id is {1}", - addContextProto.parent_context_id, - currentTopContext.Id)); - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - string contextConfigString = addContextProto.context_configuration; - ContextConfiguration contextConfiguration = new ContextConfiguration(contextConfigString); - ContextRuntime newTopContext; - if (addContextProto.service_configuration != null) - { - ServiceConfiguration serviceConfiguration = new ServiceConfiguration(addContextProto.service_configuration); - newTopContext = currentTopContext.SpawnChildContext(contextConfiguration, serviceConfiguration.TangConfig); - } - else - { - newTopContext = currentTopContext.SpawnChildContext(contextConfiguration); - } - _contextStack.Push(newTopContext); - } - } - - /// <summary> - /// Remove the context with the given ID from the stack. - /// </summary> - /// <param name="contextId"> context id</param> - private void RemoveContext(string contextId) - { - lock (_contextStack) - { - string currentTopContextId = _contextStack.Peek().Id; - if (!contextId.Equals(_contextStack.Peek().Id, StringComparison.OrdinalIgnoreCase)) - { - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to close context with id '{0}' while the top context id is {1}", contextId, currentTopContextId)); - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - _contextStack.Peek().Dispose(); - if (_contextStack.Count > 1) - { - // We did not close the root context. Therefore, we need to inform the - // driver explicitly that this context is closed. The root context notification - // is implicit in the Evaluator close/done notification. - _heartBeatManager.OnNext(); // Ensure Driver gets notified of context DONE state - } - _contextStack.Pop(); - } - // System.gc(); // TODO: garbage collect? - } - - /// <summary> - /// Launch an Task. - /// </summary> - /// <param name="startTaskProto"></param> - private void StartTask(StartTaskProto startTaskProto) - { - lock (_contextStack) - { - ContextRuntime currentActiveContext = _contextStack.Peek(); - string expectedContextId = startTaskProto.context_id; - if (!expectedContextId.Equals(currentActiveContext.Id, StringComparison.OrdinalIgnoreCase)) - { - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task expected context '{0}' but the active context has Id '{1}'", expectedContextId, currentActiveContext.Id)); - Org.Apache.Reef.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - TaskConfiguration taskConfiguration = new TaskConfiguration(startTaskProto.configuration); - currentActiveContext.StartTask(taskConfiguration, expectedContextId, _heartBeatManager); - } - } - - /// <summary> - /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager - /// </summary> - /// <param name="e"></param> - private void HandleTaskException(TaskClientCodeException e) - { - LOGGER.Log(Level.Error, "TaskClientCodeException", e); - byte[] exception = ByteUtilities.StringToByteArrays(e.ToString()); - TaskStatusProto taskStatus = new TaskStatusProto() - { - context_id = e.ContextId, - task_id = e.TaskId, - result = exception, - state = State.FAILED - }; - LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeatb for a failed task: {0}", taskStatus.ToString())); - _heartBeatManager.OnNext(taskStatus); - } - - /// <summary> - /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager - /// </summary> - /// <param name="e"></param> - private void HandlContextException(ContextClientCodeException e) - { - LOGGER.Log(Level.Error, "ContextClientCodeException", e); - byte[] exception = ByteUtilities.StringToByteArrays(e.ToString()); - ContextStatusProto contextStatusProto = new ContextStatusProto() - { - context_id = e.ContextId, - context_state = ContextStatusProto.State.FAIL, - error = exception - }; - if (e.ParentId.IsPresent()) - { - contextStatusProto.parent_id = e.ParentId.Value; - } - LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeat for a failed context: {0}", contextStatusProto.ToString())); - _heartBeatManager.OnNext(contextStatusProto); - } - } -} \ No newline at end of file
