http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto deleted file mode 100644 index 1415e5c..0000000 --- a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/evaluator_runtime.proto +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -option java_package = "org.apache.reef.proto"; -option java_outer_classname = "EvaluatorRuntimeProtocol"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; - -import "reef_service_protos.proto"; - -// Stop the evaluator -message StopEvaluatorProto { -} - -// Kill the evaluator -message KillEvaluatorProto { -} - -// Start a task -message StartTaskProto { - required string context_id = 1; - required string configuration = 2; -} - -message AddContextProto { - required string parent_context_id = 1; - required string context_configuration = 2; - optional string service_configuration = 3; -} - -message RemoveContextProto { - required string context_id = 1; -} - -// Stop the task -message StopTaskProto { -} - -// Suspend the task -message SuspendTaskProto { -} - -///////////////////////////////////////// -// Message aggregators - -message ContextMessageProto { - required string context_id = 1; - required bytes message = 2; -} - -message ContextControlProto { - optional bytes task_message = 1; - optional ContextMessageProto context_message = 2; - - optional AddContextProto add_context = 5; - optional RemoveContextProto remove_context = 6; - optional StartTaskProto start_task = 7; - optional StopTaskProto stop_task = 8; - optional SuspendTaskProto suspend_task = 9; -} - -message EvaluatorHeartbeatProto { - required int64 timestamp = 1; - required EvaluatorStatusProto evaluator_status = 2; - repeated ContextStatusProto context_status = 3; - optional TaskStatusProto task_status = 4; - optional bool recovery = 5; -} - -message EvaluatorControlProto { - required int64 timestamp = 1; - required string identifier = 2; - - optional ContextControlProto context_control = 3; - optional KillEvaluatorProto kill_evaluator = 4; -}
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto deleted file mode 100644 index 6b99415..0000000 --- a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_protocol.proto +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -import "client_runtime.proto"; - -import "evaluator_runtime.proto"; - -import "reef_service_protos.proto"; - - -option java_package = "com.Org.Apache.REEF.proto"; - -option java_generic_services = true; - -option java_generate_equals_and_hash = true; - -option java_outer_classname = "REEFProtocol"; - -message REEFMessage { - // Messages defined in client_runtime.proto - optional JobSubmissionProto jobSubmission = 1; - optional JobControlProto jobControl = 2; - // Messages defined in reef_service_protos.proto - optional RuntimeErrorProto runtimeError = 3; - optional JobStatusProto jobStatus = 4; - // Messages from evaluator_runtime.proto - optional EvaluatorControlProto evaluatorControl = 5; - optional EvaluatorHeartbeatProto evaluatorHeartBeat = 6; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto b/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto deleted file mode 100644 index a553ca9..0000000 --- a/lang/cs/Org.Apache.REEF.Common/protobuf/proto/reef_service_protos.proto +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -option java_package = "org.apache.reef.reef.proto"; - -option java_outer_classname = "ReefServiceProtos"; - -option java_generic_services = true; - -option java_generate_equals_and_hash = true; - -enum State { - INIT = 0; - RUNNING = 1; - DONE = 2; - SUSPEND = 3; - FAILED = 4; - KILLED = 5; -} - -enum FileType { - PLAIN = 0; - LIB = 1; - ARCHIVE = 2; -} - -// Removed in REEF 0.3 in favor of explicit memory sizes. -// enum SIZE { -// SMALL = 0; -// MEDIUM = 1; -// LARGE = 2; -// XLARGE = 3; -//} - -enum ProcessType { - JVM = 0; - CLR = 1; -} - -message FileResourceProto { - required FileType type = 1; - required string name = 2; - required string path = 3; -} - -message RuntimeErrorProto { - required string name = 1; // e.g., local, yarn21 - required string message = 2; - optional bytes exception = 3; - - optional string identifier = 5; // e.g., evaluator id -} - -message JobStatusProto { - required string identifier = 1; - required State state = 2; - optional bytes message = 3; - optional bytes exception = 4; -} - -message ContextStatusProto { - enum State { - READY = 0; - DONE = 1; - FAIL = 2; - } - required State context_state = 1; - - required string context_id = 2; - optional string parent_id = 3; - - optional bytes error = 5; // when creating the context - - optional bool recovery = 6; - // Context messages - message ContextMessageProto { - required string source_id = 1; - required bytes message = 2; - } - repeated ContextMessageProto context_message = 7; -} - -message TaskStatusProto { - required string task_id = 1; - required string context_id = 2; - required State state = 3; - optional bytes result = 4; // e.g., return value from Task.call() - optional bool recovery = 5; - - // TaskMessageSource messages - message TaskMessageProto { - required string source_id = 1; - required bytes message = 2; - } - repeated TaskMessageProto task_message = 6; -} - -message EvaluatorStatusProto { - required string evaluator_id = 1; - required State state = 2; - optional bytes error = 3; -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs b/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs deleted file mode 100644 index 54aca4c..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/MachineStatus.cs +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System; -using System.Diagnostics; -using System.Globalization; -using Org.Apache.REEF.Utilities.Logging; - -namespace Org.Apache.REEF.Common.Runtime -{ - public class MachineStatus - { - private static PerformanceCounter _cpuCounter; - - private static PerformanceCounter _ramCounter; - - private static PerformanceCounter _processCpuCounter; - - private static Process _process; - - private static bool _checkStatus; - - static MachineStatus() - { - _checkStatus = true; - _process = Process.GetCurrentProcess(); - string processName = _process.ProcessName; - - _cpuCounter = _cpuCounter ?? new PerformanceCounter() - { - CategoryName = "Processor", - CounterName = "% Processor Time", - InstanceName = "_Total", - }; - - _ramCounter = _ramCounter ?? new PerformanceCounter() - { - CategoryName = "Memory", - CounterName = "Available MBytes" - }; - - _processCpuCounter = _processCpuCounter ?? new PerformanceCounter() - { - CategoryName = "Process", - CounterName = "% Processor Time", - InstanceName = processName - }; - } - - public static string CurrentNodeCpuUsage - { - get - { - return _cpuCounter.NextValue() + "%"; - } - } - - public static string AvailableMemory - { - get - { - return _ramCounter.NextValue() + "MB"; - } - } - - public static string CurrentProcessMemoryUsage - { - get - { - return ((float)_process.WorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB"; - } - } - - public static string PeakProcessMemoryUsage - { - get - { - return ((float)_process.PeakWorkingSet64 / 1000000.0).ToString(CultureInfo.InvariantCulture) + "MB"; - } - } - - // this may not be accurate if there are multiple evaluator processes running on a single machine - public static string CurrentProcessCpuUsage - { - get - { - return ((float)_processCpuCounter.RawValue / 1000000.0) + "%"; - } - } - - public override string ToString() - { - string info = "No machine status information retrieved. Could be due to lack of admin right to get the info."; - if (_checkStatus) - { - try - { - _process.Refresh(); - info = string.Format( - CultureInfo.InvariantCulture, - "current node is running at [{0}] CPU usage and with [{1}] memory available.{2} current evaluator process is using [{3}] of CPU and [{4}] of memory, with a peak memory usage of [{5}]", - CurrentNodeCpuUsage, - AvailableMemory, - Environment.NewLine, - CurrentProcessCpuUsage, - CurrentProcessMemoryUsage, - PeakProcessMemoryUsage); - } - catch (Exception e) - { - _checkStatus = false; // It only takes one exception to switch the cheking off for good. - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, "Cannot obtain machine status due to error", Logger.GetLogger(typeof(MachineStatus))); - // we do not want to crash the evealuator just because we cannot get the information. - info = "Cannot obtain machine status due to error " + e.Message; - } - } - - return info; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs deleted file mode 100644 index 97d705b..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/Constants.cs +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -namespace Org.Apache.REEF.Evaluator -{ - public class Constants - { - public const string RootContextConfiguration = "RootContextConfiguration"; - - public const string EvaluatorIdentifier = "EvaluatorIdentifier"; - - public const string RootServiceConfiguration = "RootServiceConfiguration"; - - public const string TaskConfiguration = "TaskConfiguration"; - - public const string ContextIdentifier = "ContextIdentifier"; - - public const string ApplicationIdentifier = "ApplicationIdentifier"; - - public const int DefaultEvaluatorHeartbeatPeriodInMs = 4000; - - public const int DefaultEvaluatorHeartbeatMaxRetry = 3; - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs deleted file mode 100644 index 217e24d..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorRuntime.cs +++ /dev/null @@ -1,255 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Context; -using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto; -using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol; -using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; -using Org.Apache.REEF.Evaluator; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Time; -using Org.Apache.REEF.Wake.Time.Runtime.Event; -using System; -using System.Globalization; - -namespace Org.Apache.REEF.Common -{ - public class EvaluatorRuntime : IObserver<RuntimeStart>, IObserver<RuntimeStop>, IObserver<REEFMessage> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorRuntime)); - - private readonly string _evaluatorId; - - private readonly ContextManager _contextManager; - - private readonly HeartBeatManager _heartBeatManager; - - private readonly IRemoteManager<REEFMessage> _remoteManager; - - private readonly IClock _clock; - - private State _state = State.INIT; - - private IDisposable _evaluatorControlChannel; - - [Inject] - public EvaluatorRuntime( - ContextManager contextManager, - HeartBeatManager heartBeatManager) - { - using (LOGGER.LogFunction("EvaluatorRuntime::EvaluatorRuntime")) - { - _clock = heartBeatManager.EvaluatorSettings.RuntimeClock; - _heartBeatManager = heartBeatManager; - _contextManager = contextManager; - _evaluatorId = heartBeatManager.EvaluatorSettings.EvalutorId; - _remoteManager = heartBeatManager.EvaluatorSettings.RemoteManager; - - ReefMessageProtoObserver driverObserver = new ReefMessageProtoObserver(); - - // subscribe to driver proto message - driverObserver.Subscribe(o => OnNext(o.Message)); - - // register the driver observer - _evaluatorControlChannel = _remoteManager.RegisterObserver(driverObserver); - - // start the hearbeat - _clock.ScheduleAlarm(0, heartBeatManager); - } - } - - public State State - { - get - { - return _state; - } - } - - public void Handle(EvaluatorControlProto message) - { - lock (_heartBeatManager) - { - LOGGER.Log(Level.Info, "Handle Evaluator control message"); - if (!message.identifier.Equals(_evaluatorId, StringComparison.OrdinalIgnoreCase)) - { - Handle(new InvalidOperationException( - string.Format(CultureInfo.InvariantCulture, "Identifier mismatch: message for evaluator id[{0}] sent to evaluator id[{1}]", message.identifier, _evaluatorId))); - } - else if (_state != State.RUNNING) - { - Handle(new InvalidOperationException( - string.Format(CultureInfo.InvariantCulture, "Evaluator received a control message but its state is not {0} but rather {1}", State.RUNNING, _state))); - } - else - { - if (message.context_control != null) - { - LOGGER.Log(Level.Info, "Send task control message to ContextManager"); - try - { - _contextManager.HandleTaskControl(message.context_control); - if (_contextManager.ContextStackIsEmpty() && _state == State.RUNNING) - { - LOGGER.Log(Level.Info, "Context stack is empty, done"); - _state = State.DONE; - _heartBeatManager.OnNext(GetEvaluatorStatus()); - _clock.Dispose(); - } - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); - Handle(e); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException(e.ToString(), e), LOGGER); - } - } - if (message.kill_evaluator != null) - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator {0} has been killed by the driver.", _evaluatorId)); - _state = State.KILLED; - _clock.Dispose(); - } - } - } - } - - public EvaluatorStatusProto GetEvaluatorStatus() - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Evaluator state : {0}", _state)); - EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto() - { - evaluator_id = _evaluatorId, - state = _state - }; - return evaluatorStatusProto; - } - - public void OnNext(RuntimeStart runtimeStart) - { - lock (_evaluatorId) - { - try - { - LOGGER.Log(Level.Info, "Runtime start"); - if (_state != State.INIT) - { - var e = new InvalidOperationException("State should be init."); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - _state = State.RUNNING; - _contextManager.Start(); - _heartBeatManager.OnNext(); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); - Handle(e); - } - } - } - - void IObserver<RuntimeStart>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<REEFMessage>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<REEFMessage>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStop>.OnCompleted() - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStop>.OnError(Exception error) - { - throw new NotImplementedException(); - } - - void IObserver<RuntimeStart>.OnCompleted() - { - throw new NotImplementedException(); - } - - public void OnNext(RuntimeStop runtimeStop) - { - LOGGER.Log(Level.Info, "Runtime stop"); - _contextManager.Dispose(); - - if (_state == State.RUNNING) - { - _state = State.DONE; - _heartBeatManager.OnNext(); - } - try - { - _evaluatorControlChannel.Dispose(); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, "Exception during shut down.", LOGGER); - } - LOGGER.Log(Level.Info, "EvaluatorRuntime shutdown complete"); - } - - public void OnNext(REEFMessage value) - { - if (value != null && value.evaluatorControl != null) - { - LOGGER.Log(Level.Info, "Received a REEFMessage with EvaluatorControl"); - Handle(value.evaluatorControl); - } - } - - private void Handle(Exception e) - { - lock (_heartBeatManager) - { - LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with exception", _evaluatorId), e); - _state = State.FAILED; - string errorMessage = string.Format( - CultureInfo.InvariantCulture, - "failed with error [{0}] with mesage [{1}] and stack trace [{2}]", - e, - e.Message, - e.StackTrace); - EvaluatorStatusProto evaluatorStatusProto = new EvaluatorStatusProto() - { - evaluator_id = _evaluatorId, - error = ByteUtilities.StringToByteArrays(errorMessage), - state = _state - }; - _heartBeatManager.OnNext(evaluatorStatusProto); - _contextManager.Dispose(); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs deleted file mode 100644 index bc939d9..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/EvaluatorSettings.cs +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Evaluator; -using Org.Apache.REEF.Common.Evaluator.Context; -using Org.Apache.REEF.Common.io; -using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Time; -using System; - -namespace Org.Apache.REEF.Evaluator -{ - // TODO: merge with EvaluatorConfigurations class - public class EvaluatorSettings - { - private string _applicationId; - - private string _evaluatorId; - - private int _heartBeatPeriodInMs; - - private int _maxHeartbeatRetries; - - private ContextConfiguration _rootContextConfig; - - private IClock _clock; - - private IRemoteManager<REEFMessage> _remoteManager; - - private IInjector _injector; - - private EvaluatorOperationState _operationState; - - private INameClient _nameClient; - - public EvaluatorSettings( - string applicationId, - string evaluatorId, - int heartbeatPeriodInMs, - int maxHeartbeatRetries, - ContextConfiguration rootContextConfig, - IClock clock, - IRemoteManager<REEFMessage> remoteManager, - IInjector injecor) - { - if (string.IsNullOrWhiteSpace(evaluatorId)) - { - throw new ArgumentNullException("evaluatorId"); - } - if (rootContextConfig == null) - { - throw new ArgumentNullException("rootContextConfig"); - } - if (clock == null) - { - throw new ArgumentNullException("clock"); - } - if (remoteManager == null) - { - throw new ArgumentNullException("remoteManager"); - } - if (injecor == null) - { - throw new ArgumentNullException("injecor"); - } - _applicationId = applicationId; - _evaluatorId = evaluatorId; - _heartBeatPeriodInMs = heartbeatPeriodInMs; - _maxHeartbeatRetries = maxHeartbeatRetries; - _rootContextConfig = rootContextConfig; - _clock = clock; - _remoteManager = remoteManager; - _injector = injecor; - _operationState = EvaluatorOperationState.OPERATIONAL; - } - - public EvaluatorOperationState OperationState - { - get - { - return _operationState; - } - - set - { - _operationState = value; - } - } - - public string EvalutorId - { - get - { - return _evaluatorId; - } - } - - public int HeartBeatPeriodInMs - { - get - { - return _heartBeatPeriodInMs; - } - } - - public string ApplicationId - { - get - { - return _applicationId; - } - } - - public int MaxHeartbeatFailures - { - get - { - return _maxHeartbeatRetries; - } - } - - public ContextConfiguration RootContextConfig - { - get - { - return _rootContextConfig; - } - } - - public IClock RuntimeClock - { - get - { - return _clock; - } - } - - public INameClient NameClient - { - get - { - return _nameClient; - } - - set - { - _nameClient = value; - } - } - - public IRemoteManager<REEFMessage> RemoteManager - { - get - { - return _remoteManager; - } - } - - public IInjector Injector - { - get - { - return _injector; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs deleted file mode 100644 index 6d2121e..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/HeartBeatManager.cs +++ /dev/null @@ -1,390 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Context; -using Org.Apache.REEF.Common.Evaluator; -using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto; -using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol; -using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; -using Org.Apache.REEF.Common.Runtime; -using Org.Apache.REEF.Evaluator; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Remote.Impl; -using Org.Apache.REEF.Wake.Time; -using System; -using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; -using System.Globalization; -using System.Linq; -using System.Net; -using System.Threading; - -namespace Org.Apache.REEF.Common -{ - public class HeartBeatManager : IObserver<Alarm> - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(HeartBeatManager)); - - private static readonly MachineStatus MachineStatus = new MachineStatus(); - - private readonly IRemoteManager<REEFMessage> _remoteManager; - - private readonly IClock _clock; - - private readonly int _heartBeatPeriodInMillSeconds; - - private readonly int _maxHeartbeatRetries = 0; - - private readonly string _evaluatorId; - - private IRemoteIdentifier _remoteId; - - private IObserver<REEFMessage> _observer; - - private int _heartbeatFailures = 0; - - private IDriverConnection _driverConnection; - - private EvaluatorSettings _evaluatorSettings; - - // the queue can only contains the following: - // 1. all failed heartbeats (regular and event-based) before entering RECOVERY state - // 2. event-based heartbeats generated in RECOVERY state (since there will be no attempt to send regular heartbeat) - private Queue<EvaluatorHeartbeatProto> _queuedHeartbeats = new Queue<EvaluatorHeartbeatProto>(); - - public HeartBeatManager(EvaluatorSettings settings, IRemoteIdentifier remoteId) - { - using (LOGGER.LogFunction("HeartBeatManager::HeartBeatManager")) - { - _remoteManager = settings.RemoteManager; - _remoteId = remoteId; - _evaluatorId = settings.EvalutorId; - _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId)); - _clock = settings.RuntimeClock; - _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs; - _maxHeartbeatRetries = settings.MaxHeartbeatFailures; - EvaluatorSettings = settings; - MachineStatus.ToString(); // kick start the CPU perf counter - } - } - - [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")] - public EvaluatorRuntime _evaluatorRuntime { get; set; } - - [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")] - public ContextManager _contextManager { get; set; } - - public EvaluatorSettings EvaluatorSettings - { - get - { - return _evaluatorSettings; - } - - private set - { - _evaluatorSettings = value; - } - } - - public void Send(EvaluatorHeartbeatProto evaluatorHeartbeatProto) - { - lock (_queuedHeartbeats) - { - if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY) - { - LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "In RECOVERY mode, heartbeat queued as [{0}]. ", evaluatorHeartbeatProto)); - _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto); - return; - } - - // NOT during recovery, try to send - REEFMessage payload = new REEFMessage(evaluatorHeartbeatProto); - try - { - _observer.OnNext(payload); - _heartbeatFailures = 0; // reset failure counts if we are having intermidtten (not continuous) failures - } - catch (Exception e) - { - if (evaluatorHeartbeatProto.task_status == null || evaluatorHeartbeatProto.task_status.state != State.RUNNING) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, "Lost communications to driver when no task is running, recovery NOT supported for such scenario", LOGGER); - } - - _heartbeatFailures++; - - _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto); - LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e); - - if (_heartbeatFailures >= _maxHeartbeatRetries) - { - LOGGER.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Heartbeat communications to driver reached max of {0} failures. \n==== Driver is considered dead/unreachable. === \n=========== Entering RECOVERY mode. ===========", _heartbeatFailures)); - try - { - _driverConnection = _evaluatorSettings.Injector.GetInstance<IDriverConnection>(); - } - catch (Exception ex) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Failed to inject the driver reconnect implementation", LOGGER); - } - LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection); - _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY; - - // clean heartbeat failure - _heartbeatFailures = 0; - } - } - } - } - - /// <summary> - /// Assemble a complete new heartbeat and send it out. - /// </summary> - public void OnNext() - { - LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext()"); - lock (this) - { - LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext()"); - EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto(); - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); - Send(heartbeatProto); - } - } - - /// <summary> - /// Called with a specific TaskStatus that must be delivered to the driver - /// </summary> - /// <param name="taskStatusProto"></param> - public void OnNext(TaskStatusProto taskStatusProto) - { - LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(TaskStatusProto)"); - lock (this) - { - LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(TaskStatusProto)"); - EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto( - _evaluatorRuntime.GetEvaluatorStatus(), - _contextManager.GetContextStatusCollection(), - Optional<TaskStatusProto>.Of(taskStatusProto)); - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); - Send(heartbeatProto); - } - } - - /// <summary> - /// Called with a specific ContextStatusProto that must be delivered to the driver - /// </summary> - /// <param name="contextStatusProto"></param> - public void OnNext(ContextStatusProto contextStatusProto) - { - LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(ContextStatusProto)"); - lock (this) - { - LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(ContextStatusProto)"); - List<ContextStatusProto> contextStatusProtos = new List<ContextStatusProto>(); - contextStatusProtos.Add(contextStatusProto); - contextStatusProtos.AddRange(_contextManager.GetContextStatusCollection()); - EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto( - _evaluatorRuntime.GetEvaluatorStatus(), - contextStatusProtos, - Optional<TaskStatusProto>.Empty()); - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); - Send(heartbeatProto); - } - } - - /// <summary> - /// Called with a specific EvaluatorStatus that must be delivered to the driver - /// </summary> - /// <param name="evaluatorStatusProto"></param> - public void OnNext(EvaluatorStatusProto evaluatorStatusProto) - { - LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(EvaluatorStatusProto)"); - lock (this) - { - LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(EvaluatorStatusProto)"); - EvaluatorHeartbeatProto heartbeatProto = new EvaluatorHeartbeatProto() - { - timestamp = CurrentTimeMilliSeconds(), - evaluator_status = evaluatorStatusProto - }; - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); - Send(heartbeatProto); - } - } - - public void OnNext(Alarm value) - { - LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(Alarm)"); - lock (this) - { - LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(Alarm)"); - if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && _evaluatorRuntime.State == State.RUNNING) - { - EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto(); - LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString())); - Send(evaluatorHeartbeatProto); - _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this); - } - else - { - LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", _evaluatorSettings.OperationState, _evaluatorRuntime.State)); - try - { - DriverInformation driverInformation = _driverConnection.GetDriverInformation(_evaluatorSettings.ApplicationId); - if (driverInformation == null) - { - LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later."); - } - else - { - LOGGER.Log( - Level.Info, - string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId)); - Recover(driverInformation); - } - } - catch (Exception e) - { - // we do not want any exception to stop the query for driver status - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, LOGGER); - } - _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this); - } - } - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - - private static long CurrentTimeMilliSeconds() - { - // this is an implmenation to get current time milli second counted from Jan 1st, 1970 - // it is chose as such to be compatible with java implmentation - DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); - return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds; - } - - private void Recover(DriverInformation driverInformation) - { - IPEndPoint driverEndpoint = NetUtilities.ParseIpEndpoint(driverInformation.DriverRemoteIdentifier); - _remoteId = new SocketRemoteIdentifier(driverEndpoint); - _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId)); - lock (_evaluatorSettings) - { - if (_evaluatorSettings.NameClient != null) - { - try - { - LOGGER.Log(Level.Verbose, "Trying to reset and reconnect to name server" + driverInformation.NameServerId); - _evaluatorSettings.NameClient.Restart(NetUtilities.ParseIpEndpoint(driverInformation.NameServerId)); - LOGGER.Log(Level.Info, "Reconnected to name server: " + driverInformation.NameServerId); - } - catch (Exception e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, LOGGER); - } - } - } - - lock (_queuedHeartbeats) - { - bool firstHeartbeatInQueue = true; - while (_queuedHeartbeats.Any()) - { - LOGGER.Log(Level.Info, "Sending cached recovery heartbeats to " + _remoteId); - try - { - if (firstHeartbeatInQueue) - { - // first heartbeat is specially construted to include the recovery flag - EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue()); - LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat); - _observer.OnNext(new REEFMessage(recoveryHeartbeat)); - firstHeartbeatInQueue = false; - } - else - { - _observer.OnNext(new REEFMessage(_queuedHeartbeats.Dequeue())); - } - } - catch (Exception e) - { - // we do not handle failures during RECOVERY - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow( - e, - Level.Error, - string.Format(CultureInfo.InvariantCulture, "Hearbeat attempt failed in RECOVERY mode to Driver {0} , giving up...", _remoteId), - LOGGER); - } - Thread.Sleep(500); - } - } - _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL; - LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ==========="); - } - - private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat) - { - heartbeat.recovery = true; - heartbeat.context_status.ForEach(c => c.recovery = true); - heartbeat.task_status.recovery = true; - return heartbeat; - } - - private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto() - { - return GetEvaluatorHeartbeatProto( - _evaluatorRuntime.GetEvaluatorStatus(), - _contextManager.GetContextStatusCollection(), - _contextManager.GetTaskStatus()); - } - - private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto( - EvaluatorStatusProto evaluatorStatusProto, - ICollection<ContextStatusProto> contextStatusProtos, - Optional<TaskStatusProto> taskStatusProto) - { - EvaluatorHeartbeatProto evaluatorHeartbeatProto = new EvaluatorHeartbeatProto() - { - timestamp = CurrentTimeMilliSeconds(), - evaluator_status = evaluatorStatusProto - }; - foreach (ContextStatusProto contextStatusProto in contextStatusProtos) - { - evaluatorHeartbeatProto.context_status.Add(contextStatusProto); - } - if (taskStatusProto.IsPresent()) - { - evaluatorHeartbeatProto.task_status = taskStatusProto.Value; - } - return evaluatorHeartbeatProto; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs deleted file mode 100644 index 5593e08..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/ReefMessageProtoObserver.cs +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.ProtoBuf.ReefProtocol; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote; -using System; -using System.Globalization; -using System.Threading; - -namespace Org.Apache.REEF.Common -{ - public class ReefMessageProtoObserver : - IObserver<IRemoteMessage<REEFMessage>>, - IObservable<IRemoteMessage<REEFMessage>>, - IDisposable - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(ReefMessageProtoObserver)); - private volatile IObserver<IRemoteMessage<REEFMessage>> _observer = null; - private long _count = 0; - private DateTime _begin; - private DateTime _origBegin; - - public void OnCompleted() - { - } - - public void OnError(Exception error) - { - } - - public void OnNext(IRemoteMessage<REEFMessage> value) - { - REEFMessage remoteEvent = value.Message; - IRemoteIdentifier id = value.Identifier; - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "receive a ReefMessage from {0} Driver at {1}.", remoteEvent, id)); - - if (remoteEvent.evaluatorControl != null) - { - if (remoteEvent.evaluatorControl.context_control != null) - { - string context_message = null; - string task_message = null; - - if (remoteEvent.evaluatorControl.context_control.context_message != null) - { - context_message = remoteEvent.evaluatorControl.context_control.context_message.ToString(); - } - if (remoteEvent.evaluatorControl.context_control.task_message != null) - { - task_message = ByteUtilities.ByteArrarysToString(remoteEvent.evaluatorControl.context_control.task_message); - } - - if (!(string.IsNullOrEmpty(context_message) && string.IsNullOrEmpty(task_message))) - { - LOGGER.Log(Level.Info, - string.Format(CultureInfo.InvariantCulture, "Control protobuf with context message [{0}] and task message [{1}]", context_message, task_message)); - } - else if (remoteEvent.evaluatorControl.context_control.remove_context != null) - { - LOGGER.Log(Level.Info, - string.Format(CultureInfo.InvariantCulture, "Control protobuf to remove context {0}", remoteEvent.evaluatorControl.context_control.remove_context.context_id)); - } - else if (remoteEvent.evaluatorControl.context_control.add_context != null) - { - LOGGER.Log(Level.Info, - string.Format(CultureInfo.InvariantCulture, "Control protobuf to add a context on top of {0}", remoteEvent.evaluatorControl.context_control.add_context.parent_context_id)); - } - else if (remoteEvent.evaluatorControl.context_control.start_task != null) - { - LOGGER.Log(Level.Info, - string.Format(CultureInfo.InvariantCulture, "Control protobuf to start an task in {0}", remoteEvent.evaluatorControl.context_control.start_task.context_id)); - } - else if (remoteEvent.evaluatorControl.context_control.stop_task != null) - { - LOGGER.Log(Level.Info, "Control protobuf to stop task"); - } - else if (remoteEvent.evaluatorControl.context_control.suspend_task != null) - { - LOGGER.Log(Level.Info, "Control protobuf to suspend task"); - } - } - } - if (_count == 0) - { - _begin = DateTime.Now; - _origBegin = _begin; - } - var count = Interlocked.Increment(ref _count); - - int printBatchSize = 100000; - if (count % printBatchSize == 0) - { - DateTime end = DateTime.Now; - var diff = (end - _begin).TotalMilliseconds; - double seconds = diff / 1000.0; - long eventsPerSecond = (long)(printBatchSize / seconds); - _begin = DateTime.Now; - } - - var observer = _observer; - if (observer != null) - { - observer.OnNext(value); - } - } - - public IDisposable Subscribe(IObserver<IRemoteMessage<REEFMessage>> observer) - { - if (_observer != null) - { - return null; - } - _observer = observer; - return this; - } - - public void Dispose() - { - _observer = null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs deleted file mode 100644 index 3a0b474..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextClientCodeException.cs +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Tang.Interface; -using System; - -namespace Org.Apache.REEF.Common.Context -{ - public class ContextClientCodeException : Exception - { - private readonly string _contextId; - private readonly Optional<string> _parentId; - - /// <summary> - /// construt the exception that caused the error - /// </summary> - /// <param name="contextId"> the id of the failed context.</param> - /// <param name="parentId"> the id of the failed context's parent, if any.</param> - /// <param name="message"> the error message </param> - /// <param name="cause"> the exception that caused the error</param> - public ContextClientCodeException( - string contextId, - Optional<string> parentId, - string message, - Exception cause) - : base("Failure in context '" + contextId + "': " + message, cause) - { - _contextId = contextId; - _parentId = parentId; - } - - public string ContextId - { - get { return _contextId; } - } - - public Optional<string> ParentId - { - get { return _parentId; } - } - - /// <summary> - /// Extracts a context id from the given configuration. - /// </summary> - /// <param name="c"></param> - /// <returns>the context id in the given configuration.</returns> - public static string GetId(IConfiguration c) - { - // TODO: update after TANG is available - return string.Empty; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs deleted file mode 100644 index bcd7fb0..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextConfiguration.cs +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Interface; -using System; -using System.Collections.Generic; -using System.Globalization; -using System.IO; -using Org.Apache.REEF.Tang.Types; - -namespace Org.Apache.REEF.Common.Evaluator.Context -{ - public class ContextConfiguration : IConfiguration - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextConfiguration)); - - private Dictionary<string, string> _settings; - - public ContextConfiguration(string configString) - { - using (LOGGER.LogFunction("ContextConfiguration::ContextConfigurationn")) - { - ContainerDirectory = Directory.GetCurrentDirectory(); - - _settings = new Dictionary<string, string>(); - AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString); - foreach (ConfigurationEntry config in avroConfiguration.Bindings) - { - if (config.key.Contains(REEF.Evaluator.Constants.ContextIdentifier)) - { - config.key = REEF.Evaluator.Constants.ContextIdentifier; - LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "{0} detected for context id with value {1}", config.key, config.value)); - } - _settings.Add(config.key, config.value); - } - if (!_settings.ContainsKey(REEF.Evaluator.Constants.ContextIdentifier)) - { - string msg = "Required parameter ContextIdentifier not provided."; - LOGGER.Log(Level.Error, msg); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER); - } - } - } - - public string Id - { - get { return _settings[REEF.Evaluator.Constants.ContextIdentifier]; } - } - - public string ContainerDirectory { get; set; } - - public IConfigurationBuilder newBuilder() - { - throw new NotImplementedException(); - } - - public string GetNamedParameter(INamedParameterNode np) - { - throw new NotImplementedException(); - } - - public IClassHierarchy GetClassHierarchy() - { - throw new NotImplementedException(); - } - - public ISet<object> GetBoundSet(INamedParameterNode np) - { - throw new NotImplementedException(); - } - - public IClassNode GetBoundConstructor(IClassNode cn) - { - throw new NotImplementedException(); - } - - public IClassNode GetBoundImplementation(IClassNode cn) - { - throw new NotImplementedException(); - } - - public IConstructorDef GetLegacyConstructor(IClassNode cn) - { - throw new NotImplementedException(); - } - - public ICollection<IClassNode> GetBoundImplementations() - { - throw new NotImplementedException(); - } - - public ICollection<IClassNode> GetBoundConstructors() - { - throw new NotImplementedException(); - } - - public ICollection<INamedParameterNode> GetNamedParameters() - { - throw new NotImplementedException(); - } - - public ICollection<IClassNode> GetLegacyConstructors() - { - throw new NotImplementedException(); - } - - public IList<object> GetBoundList(INamedParameterNode np) - { - throw new NotImplementedException(); - } - - public IEnumerator<KeyValuePair<INamedParameterNode, object>> GetBoundSets() - { - throw new NotImplementedException(); - } - - public IDictionary<INamedParameterNode, IList<object>> GetBoundList() - { - throw new NotImplementedException(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs deleted file mode 100644 index 97e65c0..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextLifeCycle.cs +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Events; -using System; -using System.Collections.Generic; - -namespace Org.Apache.REEF.Common.Context -{ - /// <summary> - /// This class is used to trigger all the context life-cycle dependent events. - /// </summary> - class ContextLifeCycle - { - private HashSet<IObserver<IContextStart>> _contextStartHandlers; - - private HashSet<IObserver<IContextStop>> _contextStopHandlers; - - private HashSet<IContextMessageSource> _contextMessageSources; - - // @Inject - public ContextLifeCycle( - string id, - HashSet<IObserver<IContextStart>> contextStartHandlers, - HashSet<IObserver<IContextStop>> contextStopHandlers, - HashSet<IContextMessageSource> contextMessageSources) - { - Id = id; - _contextStartHandlers = contextStartHandlers; - _contextStopHandlers = contextStopHandlers; - _contextMessageSources = contextMessageSources; - } - - public ContextLifeCycle(string contextId) - { - Id = contextId; - _contextStartHandlers = new HashSet<IObserver<IContextStart>>(); - _contextStopHandlers = new HashSet<IObserver<IContextStop>>(); - _contextMessageSources = new HashSet<IContextMessageSource>(); - } - - public string Id { get; private set; } - - public HashSet<IContextMessageSource> ContextMessageSources - { - get { return _contextMessageSources; } - } - - /// <summary> - /// Fires ContextStart to all registered event handlers. - /// </summary> - public void Start() - { - IContextStart contextStart = new ContextStartImpl(Id); - - // TODO: enable - //foreach (IObserver<IContextStart> startHandler in _contextStartHandlers) - //{ - // startHandler.OnNext(contextStart); - //} - } - - /// <summary> - /// Fires ContextStop to all registered event handlers. - /// </summary> - public void Close() - { - //IContextStop contextStop = new ContextStopImpl(Id); - //foreach (IObserver<IContextStop> startHandler in _contextStopHandlers) - //{ - // startHandler.OnNext(contextStop); - //} - } - - public void HandleContextMessage(byte[] message) - { - //contextMessageHandler.onNext(message); - } - - /// <summary> - /// get the set of ContextMessageSources configured - /// </summary> - /// <returns>(a shallow copy of) the set of ContextMessageSources configured.</returns> - public HashSet<IContextMessageSource> GetContextMessageSources() - { - return new HashSet<IContextMessageSource>(_contextMessageSources); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7edb8570/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs deleted file mode 100644 index 7c4d288..0000000 --- a/lang/cs/Org.Apache.REEF.Common/runtime/evaluator/context/ContextManager.cs +++ /dev/null @@ -1,362 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Common.Evaluator.Context; -using Org.Apache.REEF.Common.ProtoBuf.EvaluatorRunTimeProto; -using Org.Apache.REEF.Common.ProtoBuf.ReefServiceProto; -using Org.Apache.REEF.Common.Task; -using Org.Apache.REEF.Evaluator; -using Org.Apache.REEF.Services; -using Org.Apache.REEF.Tasks; -using Org.Apache.REEF.Utilities; -using Org.Apache.REEF.Utilities.Logging; -using System; -using System.Collections.Generic; -using System.Collections.ObjectModel; -using System.Globalization; -using System.Linq; - -namespace Org.Apache.REEF.Common.Context -{ - public class ContextManager : IDisposable - { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextManager)); - - private readonly Stack<ContextRuntime> _contextStack = new Stack<ContextRuntime>(); - - private readonly HeartBeatManager _heartBeatManager; - - private RootContextLauncher _rootContextLauncher; - - public ContextManager(HeartBeatManager heartBeatManager, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig) - { - using (LOGGER.LogFunction("ContextManager::ContextManager")) - { - _heartBeatManager = heartBeatManager; - _rootContextLauncher = new RootContextLauncher(_heartBeatManager.EvaluatorSettings.RootContextConfig, rootServiceConfig, rootTaskConfig); - } - } - - /// <summary> - /// Start the context manager. This initiates the root context. - /// </summary> - public void Start() - { - lock (_contextStack) - { - ContextRuntime rootContext = _rootContextLauncher.GetRootContext(); - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Instantiating root context with Id {0}", rootContext.Id)); - _contextStack.Push(rootContext); - - if (_rootContextLauncher.RootTaskConfig.IsPresent()) - { - LOGGER.Log(Level.Info, "Launching the initial Task"); - try - { - _contextStack.Peek().StartTask(_rootContextLauncher.RootTaskConfig.Value, _rootContextLauncher.RootContextConfig.Id, _heartBeatManager); - } - catch (TaskClientCodeException e) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Exception when trying to start a task.", LOGGER); - HandleTaskException(e); - } - } - } - } - - public bool ContextStackIsEmpty() - { - lock (_contextStack) - { - return (_contextStack.Count == 0); - } - } - - // TODO: codes here are slightly different from java since the protobuf.net does not generate the HasXXX method, may want to switch to proto-port later - - /// <summary> - /// Processes the given ContextControlProto to launch / close / suspend Tasks and Contexts. - /// This also triggers the HeartBeatManager to send a heartbeat with the result of this operation. - /// </summary> - /// <param name="controlMessage"></param> - public void HandleTaskControl(ContextControlProto controlMessage) - { - try - { - byte[] message = controlMessage.task_message; - if (controlMessage.add_context != null && controlMessage.remove_context != null) - { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("Received a message with both add and remove context. This is unsupported."), LOGGER); - } - if (controlMessage.add_context != null) - { - LOGGER.Log(Level.Info, "AddContext"); - AddContext(controlMessage.add_context); - // support submitContextAndTask() - if (controlMessage.start_task != null) - { - LOGGER.Log(Level.Info, "StartTask"); - StartTask(controlMessage.start_task); - } - else - { - // We need to trigger a heartbeat here. In other cases, the heartbeat will be triggered by the TaskRuntime - // Therefore this call can not go into addContext - LOGGER.Log(Level.Info, "Trigger Heartbeat"); - _heartBeatManager.OnNext(); - } - } - else if (controlMessage.remove_context != null) - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "RemoveContext with id {0}", controlMessage.remove_context.context_id)); - RemoveContext(controlMessage.remove_context.context_id); - } - else if (controlMessage.start_task != null) - { - LOGGER.Log(Level.Info, "StartTask only"); - StartTask(controlMessage.start_task); - } - else if (controlMessage.stop_task != null) - { - LOGGER.Log(Level.Info, "CloseTask"); - _contextStack.Peek().CloseTask(message); - } - else if (controlMessage.suspend_task != null) - { - LOGGER.Log(Level.Info, "SuspendTask"); - _contextStack.Peek().SuspendTask(message); - } - else if (controlMessage.task_message != null) - { - LOGGER.Log(Level.Info, "DeliverTaskMessage"); - _contextStack.Peek().DeliverTaskMessage(message); - } - else if (controlMessage.context_message != null) - { - LOGGER.Log(Level.Info, "Handle context contol message"); - ContextMessageProto contextMessageProto = controlMessage.context_message; - bool deliveredMessage = false; - foreach (ContextRuntime context in _contextStack) - { - if (context.Id.Equals(contextMessageProto.context_id)) - { - LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Handle context message {0}", controlMessage.context_message.message)); - context.HandleContextMessaage(controlMessage.context_message.message); - deliveredMessage = true; - break; - } - } - if (!deliveredMessage) - { - InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Sent message to unknown context {0}", contextMessageProto.context_id)); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - } - else - { - InvalidOperationException e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Unknown task control message: {0}", controlMessage.ToString())); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - } - catch (Exception e) - { - if (e is TaskClientCodeException) - { - HandleTaskException((TaskClientCodeException)e); - } - else if (e is ContextClientCodeException) - { - HandlContextException((ContextClientCodeException)e); - } - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(e, Level.Error, LOGGER); - } - } - - /// <summary> - /// Get TaskStatusProto of the currently running task, if there is any - /// </summary> - /// <returns>the TaskStatusProto of the currently running task, if there is any</returns> - public Optional<TaskStatusProto> GetTaskStatus() - { - if (_contextStack.Count == 0) - { - return Optional<TaskStatusProto>.Empty(); - - //throw new InvalidOperationException("Asked for an Task status while there isn't even a context running."); - } - return _contextStack.Peek().GetTaskStatus(); - } - - /// <summary> - /// get status of all contexts in the stack. - /// </summary> - /// <returns>the status of all contexts in the stack.</returns> - public ICollection<ContextStatusProto> GetContextStatusCollection() - { - ICollection<ContextStatusProto> result = new Collection<ContextStatusProto>(); - foreach (ContextRuntime runtime in _contextStack) - { - ContextStatusProto contextStatusProto = runtime.GetContextStatus(); - LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Add context status: {0}", contextStatusProto)); - result.Add(contextStatusProto); - } - return result; - } - - /// <summary> - /// Shuts down. This forecefully kills the Task if there is one and then shuts down all Contexts on the stack, - /// starting at the top. - /// </summary> - public void Dispose() - { - lock (_contextStack) - { - if (_contextStack != null && _contextStack.Any()) - { - LOGGER.Log(Level.Info, "context stack not empty, forcefully closing context runtime."); - ContextRuntime runtime = _contextStack.Last(); - if (runtime != null) - { - runtime.Dispose(); - } - } - } - } - - /// <summary> - /// Add a context to the stack. - /// </summary> - /// <param name="addContextProto"></param> - private void AddContext(AddContextProto addContextProto) - { - lock (_contextStack) - { - ContextRuntime currentTopContext = _contextStack.Peek(); - if (!currentTopContext.Id.Equals(addContextProto.parent_context_id, StringComparison.OrdinalIgnoreCase)) - { - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to instantiate a child context on context with id '{0}' while the current top context id is {1}", - addContextProto.parent_context_id, - currentTopContext.Id)); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - string contextConfigString = addContextProto.context_configuration; - ContextConfiguration contextConfiguration = new ContextConfiguration(contextConfigString); - ContextRuntime newTopContext; - if (addContextProto.service_configuration != null) - { - ServiceConfiguration serviceConfiguration = new ServiceConfiguration(addContextProto.service_configuration); - newTopContext = currentTopContext.SpawnChildContext(contextConfiguration, serviceConfiguration.TangConfig); - } - else - { - newTopContext = currentTopContext.SpawnChildContext(contextConfiguration); - } - _contextStack.Push(newTopContext); - } - } - - /// <summary> - /// Remove the context with the given ID from the stack. - /// </summary> - /// <param name="contextId"> context id</param> - private void RemoveContext(string contextId) - { - lock (_contextStack) - { - string currentTopContextId = _contextStack.Peek().Id; - if (!contextId.Equals(_contextStack.Peek().Id, StringComparison.OrdinalIgnoreCase)) - { - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Trying to close context with id '{0}' while the top context id is {1}", contextId, currentTopContextId)); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - _contextStack.Peek().Dispose(); - if (_contextStack.Count > 1) - { - // We did not close the root context. Therefore, we need to inform the - // driver explicitly that this context is closed. The root context notification - // is implicit in the Evaluator close/done notification. - _heartBeatManager.OnNext(); // Ensure Driver gets notified of context DONE state - } - _contextStack.Pop(); - } - // System.gc(); // TODO: garbage collect? - } - - /// <summary> - /// Launch an Task. - /// </summary> - /// <param name="startTaskProto"></param> - private void StartTask(StartTaskProto startTaskProto) - { - lock (_contextStack) - { - ContextRuntime currentActiveContext = _contextStack.Peek(); - string expectedContextId = startTaskProto.context_id; - if (!expectedContextId.Equals(currentActiveContext.Id, StringComparison.OrdinalIgnoreCase)) - { - var e = new InvalidOperationException(string.Format(CultureInfo.InvariantCulture, "Task expected context '{0}' but the active context has Id '{1}'", expectedContextId, currentActiveContext.Id)); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); - } - TaskConfiguration taskConfiguration = new TaskConfiguration(startTaskProto.configuration); - currentActiveContext.StartTask(taskConfiguration, expectedContextId, _heartBeatManager); - } - } - - /// <summary> - /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager - /// </summary> - /// <param name="e"></param> - private void HandleTaskException(TaskClientCodeException e) - { - LOGGER.Log(Level.Error, "TaskClientCodeException", e); - byte[] exception = ByteUtilities.StringToByteArrays(e.ToString()); - TaskStatusProto taskStatus = new TaskStatusProto() - { - context_id = e.ContextId, - task_id = e.TaskId, - result = exception, - state = State.FAILED - }; - LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeatb for a failed task: {0}", taskStatus.ToString())); - _heartBeatManager.OnNext(taskStatus); - } - - /// <summary> - /// THIS ASSUMES THAT IT IS CALLED ON A THREAD HOLDING THE LOCK ON THE HeartBeatManager - /// </summary> - /// <param name="e"></param> - private void HandlContextException(ContextClientCodeException e) - { - LOGGER.Log(Level.Error, "ContextClientCodeException", e); - byte[] exception = ByteUtilities.StringToByteArrays(e.ToString()); - ContextStatusProto contextStatusProto = new ContextStatusProto() - { - context_id = e.ContextId, - context_state = ContextStatusProto.State.FAIL, - error = exception - }; - if (e.ParentId.IsPresent()) - { - contextStatusProto.parent_id = e.ParentId.Value; - } - LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending Heartbeat for a failed context: {0}", contextStatusProto.ToString())); - _heartBeatManager.OnNext(contextStatusProto); - } - } -} \ No newline at end of file
