Repository: reef Updated Branches: refs/heads/master 77546f8f3 -> f4d19ca40
[REEF-1049] Refactor Evaluator to properly use Tang Configuration Made the following constructor injectable: * ContextManager * HeartBeatManager * EvaluatorRuntime * EvaluatorSettings * Consolidate injector for Clock, clrDriver Configuration and Evaluator * Consolidate EvaluatorSettings and EvaluatorConfguration. EvaluatorConfguration will be deprecated. JIRA: [REEF-1049](https://issues.apache.org/jira/browse/REEF-1049) Pull Request: This closes #719 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/f4d19ca4 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/f4d19ca4 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/f4d19ca4 Branch: refs/heads/master Commit: f4d19ca407cd6d6f7ab44939da630cdf88de51f0 Parents: 77546f8 Author: Julia Wang <[email protected]> Authored: Mon Dec 14 20:35:19 2015 -0800 Committer: Markus Weimer <[email protected]> Committed: Tue Dec 15 10:26:05 2015 -0800 ---------------------------------------------------------------------- .../Files/REEFFileNames.cs | 9 + .../Org.Apache.REEF.Common.csproj | 2 + .../Protobuf/ReefProtocol/REEFMessageCodec.cs | 5 + .../Runtime/Evaluator/Context/ContextManager.cs | 15 +- .../Runtime/Evaluator/EvaluatorRuntime.cs | 12 +- .../Runtime/Evaluator/EvaluatorSettings.cs | 180 +++++++++++---- .../Runtime/Evaluator/HeartBeatManager.cs | 98 ++++---- .../Parameters/EvaluatorHeartbeatPeriodInMs.cs | 28 +++ .../Evaluator/Parameters/HeartbeatMaxRetry.cs | 28 +++ .../Evaluator/Utils/EvaluatorConfigurations.cs | 109 +++++---- .../EvaluatorConfigurationsTests.cs | 2 +- lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs | 223 ++++++------------- 12 files changed, 413 insertions(+), 298 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/f4d19ca4/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs index 7b5c87a..52bd1cc 100644 --- a/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs +++ b/lang/cs/Org.Apache.REEF.Common/Files/REEFFileNames.cs @@ -49,6 +49,7 @@ namespace Org.Apache.REEF.Common.Files private const string DRIVER_CONFIGURATION_NAME = "driver.conf"; private const string EVALUATOR_CONFIGURATION_NAME = "evaluator.conf"; private const string CLR_DRIVER_CONFIGURATION_NAME = "clrdriver.conf"; + private const string CLR_BRIDGE_CONFIGURATION_NAME = "clrBridge.config"; private const string DRIVER_HTTP_ENDPOINT_FILE_NAME = "DriverHttpEndpoint.txt"; private const string BRIDGE_EXE_NAME = "Org.Apache.REEF.Bridge.exe"; private const string BRIDGE_EXE_CONFIG_NAME = "Org.Apache.REEF.Bridge.exe.config"; @@ -153,6 +154,14 @@ namespace Org.Apache.REEF.Common.Files /// <summary> /// </summary> + /// <returns>It returns the clrBridge.config file name</returns> + public string GetClrBridgeConfigurationName() + { + return CLR_BRIDGE_CONFIGURATION_NAME; + } + + /// <summary> + /// </summary> /// <returns> The suffix used for JAR files, including the "."</returns> public string GetJarFileSuffix() { http://git-wip-us.apache.org/repos/asf/reef/blob/f4d19ca4/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index e911aa0..836a6db 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -137,6 +137,8 @@ under the License. <Compile Include="Runtime\Evaluator\EvaluatorRuntime.cs" /> <Compile Include="Runtime\Evaluator\EvaluatorSettings.cs" /> <Compile Include="Runtime\Evaluator\HeartBeatManager.cs" /> + <Compile Include="Runtime\Evaluator\Parameters\EvaluatorHeartbeatPeriodInMs.cs" /> + <Compile Include="Runtime\Evaluator\Parameters\HeartbeatMaxRetry.cs" /> <Compile Include="Runtime\Evaluator\ReefMessageProtoObserver.cs" /> <Compile Include="Runtime\Evaluator\Task\CloseEventImpl.cs" /> <Compile Include="Runtime\Evaluator\Task\DriverMessageImpl.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/f4d19ca4/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs b/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs index 697ea1c..8f74fa4 100644 --- a/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs +++ b/lang/cs/Org.Apache.REEF.Common/Protobuf/ReefProtocol/REEFMessageCodec.cs @@ -24,6 +24,11 @@ namespace Org.Apache.REEF.Common.Protobuf.ReefProtocol { public class REEFMessageCodec : ICodec<REEFMessage> { + [Inject] + private REEFMessageCodec() + { + } + public byte[] Encode(REEFMessage obj) { return obj.Serialize(); http://git-wip-us.apache.org/repos/asf/reef/blob/f4d19ca4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs index f227c90..731cc8e 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs @@ -26,6 +26,7 @@ using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Runtime.Evaluator.Task; using Org.Apache.REEF.Common.Services; using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Logging; @@ -39,12 +40,18 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context private readonly object _contextLock = new object(); private ContextRuntime _topContext = null; - public ContextManager(HeartBeatManager heartBeatManager, Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig) + [Inject] + private ContextManager( + HeartBeatManager heartBeatManager, + EvaluatorSettings evaluatorSetting) { using (LOGGER.LogFunction("ContextManager::ContextManager")) { _heartBeatManager = heartBeatManager; - _rootContextLauncher = new RootContextLauncher(_heartBeatManager.EvaluatorSettings.RootContextConfig, rootServiceConfig, rootTaskConfig); + _rootContextLauncher = new RootContextLauncher( + evaluatorSetting.RootContextConfig, + evaluatorSetting.RootServiceConfiguration, + evaluatorSetting.RootTaskConfiguration); } } @@ -102,7 +109,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context { LOGGER.Log(Level.Info, "AddContext"); AddContext(controlMessage.add_context); - + // support submitContextAndTask() if (controlMessage.start_task != null) { @@ -153,7 +160,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context } else if (controlMessage.context_message != null) { - LOGGER.Log(Level.Info, "Handle context contol message"); + LOGGER.Log(Level.Info, "Handle context control message"); ContextMessageProto contextMessageProto = controlMessage.context_message; ContextRuntime context = null; lock (_contextLock) http://git-wip-us.apache.org/repos/asf/reef/blob/f4d19ca4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs index be0acea..42e35fa 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs @@ -48,11 +48,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator { using (Logger.LogFunction("EvaluatorRuntime::EvaluatorRuntime")) { - _clock = heartBeatManager.EvaluatorSettings.RuntimeClock; _heartBeatManager = heartBeatManager; + _clock = _heartBeatManager.EvaluatorSettings.RuntimeClock; _contextManager = contextManager; - _evaluatorId = heartBeatManager.EvaluatorSettings.EvalutorId; - var remoteManager = heartBeatManager.EvaluatorSettings.RemoteManager; + _evaluatorId = _heartBeatManager.EvaluatorSettings.EvalutorId; + var remoteManager = _heartBeatManager.EvaluatorSettings.RemoteManager; ReefMessageProtoObserver driverObserver = new ReefMessageProtoObserver(); @@ -62,8 +62,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator // register the driver observer _evaluatorControlChannel = remoteManager.RegisterObserver(driverObserver); - // start the hearbeat - _clock.ScheduleAlarm(0, heartBeatManager); + // start the heart beat + _clock.ScheduleAlarm(0, _heartBeatManager); } } @@ -196,7 +196,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator _state = State.FAILED; string errorMessage = string.Format( CultureInfo.InvariantCulture, - "failed with error [{0}] with mesage [{1}] and stack trace [{2}]", + "failed with error [{0}] with message [{1}] and stack trace [{2}]", e, e.Message, e.StackTrace); http://git-wip-us.apache.org/repos/asf/reef/blob/f4d19ca4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs index 73991d6..dfd78a7 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs @@ -22,76 +22,86 @@ using Org.Apache.REEF.Common.Evaluator; using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Runtime.Evaluator.Context; +using Org.Apache.REEF.Common.Runtime.Evaluator.Parameters; +using Org.Apache.REEF.Common.Runtime.Evaluator.Utils; +using Org.Apache.REEF.Common.Services; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Time; +using Org.Apache.REEF.Wake.Time.Runtime; namespace Org.Apache.REEF.Common.Runtime.Evaluator { - // TODO: merge with EvaluatorConfigurations class internal sealed class EvaluatorSettings { - private readonly string _applicationId; + private static readonly Logger Logger = Logger.GetLogger(typeof(EvaluatorSettings)); + private readonly string _applicationId; private readonly string _evaluatorId; - private readonly int _heartBeatPeriodInMs; - private readonly int _maxHeartbeatRetries; - - private readonly ContextConfiguration _rootContextConfig; - private readonly IClock _clock; - private readonly IRemoteManager<REEFMessage> _remoteManager; - private readonly IInjector _injector; + private readonly ContextConfiguration _rootContextConfig; + private readonly Optional<TaskConfiguration> _rootTaskConfiguration; + private readonly Optional<ServiceConfiguration> _rootServiceConfiguration; private EvaluatorOperationState _operationState; - private INameClient _nameClient; - public EvaluatorSettings( - string applicationId, - string evaluatorId, - int heartbeatPeriodInMs, - int maxHeartbeatRetries, - ContextConfiguration rootContextConfig, - IClock clock, - IRemoteManager<REEFMessage> remoteManager, - IInjector injecor) + /// <summary> + /// Constructor with + /// </summary> + /// <param name="applicationId"></param> + /// <param name="evaluatorId"></param> + /// <param name="heartbeatPeriodInMs"></param> + /// <param name="maxHeartbeatRetries"></param> + /// <param name="rootContextConfigString"></param> + /// <param name="clock"></param> + /// <param name="remoteManagerFactory"></param> + /// <param name="reefMessageCodec"></param> + /// <param name="injector"></param> + [Inject] + private EvaluatorSettings( + [Parameter(typeof(ApplicationIdentifier))] string applicationId, + [Parameter(typeof(EvaluatorIdentifier))] string evaluatorId, + [Parameter(typeof(EvaluatorHeartbeatPeriodInMs))] int heartbeatPeriodInMs, + [Parameter(typeof(HeartbeatMaxRetry))] int maxHeartbeatRetries, + [Parameter(typeof(RootContextConfiguration))] string rootContextConfigString, + RuntimeClock clock, + IRemoteManagerFactory remoteManagerFactory, + REEFMessageCodec reefMessageCodec, + IInjector injector) { - if (string.IsNullOrWhiteSpace(evaluatorId)) - { - throw new ArgumentNullException("evaluatorId"); - } - if (rootContextConfig == null) - { - throw new ArgumentNullException("rootContextConfig"); - } - if (clock == null) - { - throw new ArgumentNullException("clock"); - } - if (remoteManager == null) - { - throw new ArgumentNullException("remoteManager"); - } - if (injecor == null) - { - throw new ArgumentNullException("injecor"); - } + _injector = injector; _applicationId = applicationId; _evaluatorId = evaluatorId; _heartBeatPeriodInMs = heartbeatPeriodInMs; _maxHeartbeatRetries = maxHeartbeatRetries; - _rootContextConfig = rootContextConfig; _clock = clock; - _remoteManager = remoteManager; - _injector = injecor; + + if (string.IsNullOrWhiteSpace(rootContextConfigString)) + { + Utilities.Diagnostics.Exceptions.Throw( + new ArgumentException("empty or null rootContextConfigString"), Logger); + } + _rootContextConfig = new ContextConfiguration(rootContextConfigString); + _rootTaskConfiguration = CreateTaskConfiguration(); + _rootServiceConfiguration = CreateRootServiceConfiguration(); + + _remoteManager = remoteManagerFactory.GetInstance(reefMessageCodec); _operationState = EvaluatorOperationState.OPERATIONAL; } + /// <summary> + /// Operator State. Can be set and get. + /// </summary> public EvaluatorOperationState OperationState { get @@ -105,6 +115,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } } + /// <summary> + /// Return Evaluator Id got from Evaluator Configuration + /// </summary> public string EvalutorId { get @@ -113,6 +126,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } } + /// <summary> + /// Return HeartBeatPeriodInMs from NamedParameter + /// </summary> public int HeartBeatPeriodInMs { get @@ -121,6 +137,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } } + /// <summary> + /// Return Application Id got from Evaluator Configuration + /// </summary> public string ApplicationId { get @@ -129,7 +148,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } } - public int MaxHeartbeatFailures + /// <summary> + /// Return MaxHeartbeatRetries from NamedParameter + /// </summary> + public int MaxHeartbeatRetries { get { @@ -137,14 +159,33 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } } + /// <summary> + /// return Root Context Configuration passed from Evaluator configuration + /// </summary> public ContextConfiguration RootContextConfig { - get - { - return _rootContextConfig; - } + get { return _rootContextConfig; } + } + + /// <summary> + /// return Root Task Configuration passed from Evaluator configuration + /// </summary> + public Optional<TaskConfiguration> RootTaskConfiguration + { + get { return _rootTaskConfiguration; } + } + + /// <summary> + /// return Root Service Configuration passed from Evaluator configuration + /// </summary> + public Optional<ServiceConfiguration> RootServiceConfiguration + { + get { return _rootServiceConfiguration; } } + /// <summary> + /// return Runtime Clock injected from the constructor + /// </summary> public IClock RuntimeClock { get @@ -153,6 +194,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } } + /// <summary> + /// Return Name Client + /// </summary> public INameClient NameClient { get @@ -166,6 +210,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } } + /// <summary> + /// return Remote manager + /// </summary> public IRemoteManager<REEFMessage> RemoteManager { get @@ -174,12 +221,49 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } } - public IInjector Injector + /// <summary> + /// Injector that contains clrDrive configuration and Evaluator configuration + /// </summary> + public IInjector EvaluatorInjector { get { return _injector; } } + + private Optional<TaskConfiguration> CreateTaskConfiguration() + { + string taskConfigString = null; + try + { + taskConfigString = _injector.GetNamedInstance<InitialTaskConfiguration, string>(); + } + catch (InjectionException) + { + Logger.Log(Level.Info, "InitialTaskConfiguration is not set in Evaluator.config."); + } + return string.IsNullOrEmpty(taskConfigString) + ? Optional<TaskConfiguration>.Empty() + : Optional<TaskConfiguration>.Of( + new TaskConfiguration(taskConfigString)); + } + + private Optional<ServiceConfiguration> CreateRootServiceConfiguration() + { + string rootServiceConfigString = null; + try + { + rootServiceConfigString = _injector.GetNamedInstance<RootServiceConfiguration, string>(); + } + catch (InjectionException) + { + Logger.Log(Level.Info, "RootServiceConfiguration is not set in Evaluator.config."); + } + return string.IsNullOrEmpty(rootServiceConfigString) + ? Optional<ServiceConfiguration>.Empty() + : Optional<ServiceConfiguration>.Of( + new ServiceConfiguration(rootServiceConfigString)); + } } } http://git-wip-us.apache.org/repos/asf/reef/blob/f4d19ca4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs index 7822d7d..ddfa0eb 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs @@ -27,7 +27,10 @@ using System.Threading; using Org.Apache.REEF.Common.Evaluator; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Runtime.Evaluator.Context; +using Org.Apache.REEF.Common.Runtime.Evaluator.Utils; using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.InjectionPlan; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Remote; @@ -51,8 +54,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator private readonly int _maxHeartbeatRetries = 0; - private readonly string _evaluatorId; - private IRemoteIdentifier _remoteId; private IObserver<REEFMessage> _observer; @@ -61,46 +62,61 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator private IDriverConnection _driverConnection; - private EvaluatorSettings _evaluatorSettings; + private readonly EvaluatorSettings _evaluatorSettings; + + private readonly IInjectionFuture<EvaluatorRuntime> _evaluatorRuntime; + + private readonly IInjectionFuture<ContextManager> _contextManager; // the queue can only contains the following: // 1. all failed heartbeats (regular and event-based) before entering RECOVERY state // 2. event-based heartbeats generated in RECOVERY state (since there will be no attempt to send regular heartbeat) private readonly Queue<EvaluatorHeartbeatProto> _queuedHeartbeats = new Queue<EvaluatorHeartbeatProto>(); - public HeartBeatManager(EvaluatorSettings settings, IRemoteIdentifier remoteId) + [Inject] + private HeartBeatManager( + EvaluatorSettings settings, + IInjectionFuture<EvaluatorRuntime> evaluatorRuntime, + IInjectionFuture<ContextManager> contextManager, + [Parameter(typeof(ErrorHandlerRid))] string errorHandlerRid) { using (LOGGER.LogFunction("HeartBeatManager::HeartBeatManager")) { + _evaluatorSettings = settings; + _evaluatorRuntime = evaluatorRuntime; + _contextManager = contextManager; _remoteManager = settings.RemoteManager; - _remoteId = remoteId; - _evaluatorId = settings.EvalutorId; + _remoteId = new SocketRemoteIdentifier(NetUtilities.ParseIpEndpoint(errorHandlerRid)); _observer = _remoteManager.GetRemoteObserver(new RemoteEventEndPoint<REEFMessage>(_remoteId)); _clock = settings.RuntimeClock; _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs; - _maxHeartbeatRetries = settings.MaxHeartbeatFailures; - EvaluatorSettings = settings; + _maxHeartbeatRetries = settings.MaxHeartbeatRetries; MachineStatus.ToString(); // kick start the CPU perf counter } } - [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")] - public EvaluatorRuntime _evaluatorRuntime { get; set; } - - [SuppressMessage("StyleCop.CSharp.NamingRules", "SA1307:AccessibleFieldsMustBeginWithUpperCaseLetter", Justification = "Intended to be private, exposed now before using future injection")] - public ContextManager _contextManager { get; set; } + /// <summary> + /// Return EvaluatorRuntime referenced from HeartBeatManager + /// </summary> + public EvaluatorRuntime EvaluatorRuntime + { + get { return _evaluatorRuntime.Get(); } + } - public EvaluatorSettings EvaluatorSettings + /// <summary> + /// Return ContextManager referenced from HeartBeatManager + /// </summary> + public ContextManager ContextManager { - get - { - return _evaluatorSettings; - } + get { return _contextManager.Get(); } + } - private set - { - _evaluatorSettings = value; - } + /// <summary> + /// EvaluatorSettings contains the configuration data of the evaluators + /// </summary> + internal EvaluatorSettings EvaluatorSettings + { + get { return _evaluatorSettings; } } public void Send(EvaluatorHeartbeatProto evaluatorHeartbeatProto) @@ -137,11 +153,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator { LOGGER.Log(Level.Warning, "Heartbeat communications to driver reached max of {0} failures. Driver is considered dead/unreachable", _heartbeatFailures); LOGGER.Log(Level.Info, "=========== Entering RECOVERY mode. ==========="); - _contextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Disconnected)); + ContextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Disconnected)); try { - _driverConnection = _evaluatorSettings.Injector.GetInstance<IDriverConnection>(); + _driverConnection = _evaluatorSettings.EvaluatorInjector.GetInstance<IDriverConnection>(); } catch (Exception ex) { @@ -162,7 +178,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator /// </summary> public void OnNext() { - LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext()"); + LOGGER.Log(Level.Verbose, "Before acquiring lock: HeartbeatManager::OnNext()"); lock (this) { LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext()"); @@ -178,13 +194,13 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator /// <param name="taskStatusProto"></param> public void OnNext(TaskStatusProto taskStatusProto) { - LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(TaskStatusProto)"); + LOGGER.Log(Level.Verbose, "Before acquiring lock: HeartbeatManager::OnNext(TaskStatusProto)"); lock (this) { LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(TaskStatusProto)"); EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto( - _evaluatorRuntime.GetEvaluatorStatus(), - _contextManager.GetContextStatusCollection(), + EvaluatorRuntime.GetEvaluatorStatus(), + ContextManager.GetContextStatusCollection(), Optional<TaskStatusProto>.Of(taskStatusProto)); LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); Send(heartbeatProto); @@ -197,15 +213,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator /// <param name="contextStatusProto"></param> public void OnNext(ContextStatusProto contextStatusProto) { - LOGGER.Log(Level.Verbose, "Before aqcuiring lock: HeartbeatManager::OnNext(ContextStatusProto)"); + LOGGER.Log(Level.Verbose, "Before acquiring lock: HeartbeatManager::OnNext(ContextStatusProto)"); lock (this) { LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(ContextStatusProto)"); List<ContextStatusProto> contextStatusProtos = new List<ContextStatusProto>(); contextStatusProtos.Add(contextStatusProto); - contextStatusProtos.AddRange(_contextManager.GetContextStatusCollection()); + contextStatusProtos.AddRange(ContextManager.GetContextStatusCollection()); EvaluatorHeartbeatProto heartbeatProto = GetEvaluatorHeartbeatProto( - _evaluatorRuntime.GetEvaluatorStatus(), + EvaluatorRuntime.GetEvaluatorStatus(), contextStatusProtos, Optional<TaskStatusProto>.Empty()); LOGGER.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}.", heartbeatProto)); @@ -219,7 +235,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator /// <param name="evaluatorStatusProto"></param> public void OnNext(EvaluatorStatusProto evaluatorStatusProto) { - LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(EvaluatorStatusProto)"); + LOGGER.Log(Level.Verbose, "Before acquiring lock: HeartbeatManager::OnNext(EvaluatorStatusProto)"); lock (this) { LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(EvaluatorStatusProto)"); @@ -235,11 +251,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator public void OnNext(Alarm value) { - LOGGER.Log(Level.Verbose, "Before acquring lock: HeartbeatManager::OnNext(Alarm)"); + LOGGER.Log(Level.Verbose, "Before acquiring lock: HeartbeatManager::OnNext(Alarm)"); lock (this) { LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(Alarm)"); - if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && _evaluatorRuntime.State == State.RUNNING) + if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && EvaluatorRuntime.State == State.RUNNING) { EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto(); LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString())); @@ -248,7 +264,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } else { - LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", _evaluatorSettings.OperationState, _evaluatorRuntime.State)); + LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", EvaluatorSettings.OperationState, EvaluatorRuntime.State)); try { DriverInformation driverInformation = _driverConnection.GetDriverInformation(_evaluatorSettings.ApplicationId); @@ -346,10 +362,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } Thread.Sleep(500); } - } - + } + _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL; - _contextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Reconnected)); + ContextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Reconnected)); LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ==========="); } @@ -365,9 +381,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto() { return GetEvaluatorHeartbeatProto( - _evaluatorRuntime.GetEvaluatorStatus(), - _contextManager.GetContextStatusCollection(), - _contextManager.GetTaskStatus()); + EvaluatorRuntime.GetEvaluatorStatus(), + ContextManager.GetContextStatusCollection(), + ContextManager.GetTaskStatus()); } private EvaluatorHeartbeatProto GetEvaluatorHeartbeatProto( http://git-wip-us.apache.org/repos/asf/reef/blob/f4d19ca4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/EvaluatorHeartbeatPeriodInMs.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/EvaluatorHeartbeatPeriodInMs.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/EvaluatorHeartbeatPeriodInMs.cs new file mode 100644 index 0000000..8b8f6af --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/EvaluatorHeartbeatPeriodInMs.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Parameters +{ + [NamedParameter(Documentation = "Evaluator Heartbeat Period in ms", ShortName = "EvaluatorHeartbeatPeriodInMs", DefaultValue = "4000")] + internal sealed class EvaluatorHeartbeatPeriodInMs : Name<int> + { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/f4d19ca4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs new file mode 100644 index 0000000..87a47d6 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Parameters +{ + [NamedParameter(Documentation = "Heartbeat Max Retry", ShortName = "HeartbeatMaxRetry", DefaultValue = "3")] + internal sealed class HeartbeatMaxRetry : Name<int> + { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/f4d19ca4/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/EvaluatorConfigurations.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/EvaluatorConfigurations.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/EvaluatorConfigurations.cs index 06b47b4..4b275da 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/EvaluatorConfigurations.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/EvaluatorConfigurations.cs @@ -18,13 +18,13 @@ */ using System; +using System.Globalization; using System.IO; -using System.Linq; using Org.Apache.REEF.Common.Runtime.Evaluator.Context; using Org.Apache.REEF.Common.Services; using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Formats.AvroConfigurationDataContract; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities; @@ -32,27 +32,26 @@ using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Common.Runtime.Evaluator.Utils { + [Obsolete("Deprecated in 0.14, please use EvaluatorSettings.")] internal sealed class EvaluatorConfigurations { private static readonly Logger LOGGER = Logger.GetLogger(typeof(EvaluatorConfigurations)); - private readonly AvroConfiguration _avroConfiguration; + private readonly string _applicationId; - private readonly IConfiguration _evaluatorConfiguration; + private readonly string _evaluatorId; - private readonly string _configFile; + private readonly string _taskConfiguration; - private string _applicationId; + private readonly string _rootContextConfiguration; - private string _evaluatorId; + private readonly string _rootServiceConfiguration; - private string _taskConfiguration; + private readonly string _errorHandlerRid; - private string _rootContextConfiguration; + private readonly string _remoteId; - private string _rootServiceConfiguration; - - private string _errorHandlerRid; + private readonly string _launchId; public EvaluatorConfigurations(string configFile) { @@ -60,22 +59,54 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Utils { if (string.IsNullOrWhiteSpace(configFile)) { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("configFile"), LOGGER); + Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("configFile"), LOGGER); } if (!File.Exists(configFile)) { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new FileNotFoundException("cannot find file " + configFile), LOGGER); + Utilities.Diagnostics.Exceptions.Throw(new FileNotFoundException("cannot find file " + configFile), LOGGER); } - _configFile = configFile; - AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); - _avroConfiguration = serializer.AvroDeserializeFromFile(_configFile); - - var language = _avroConfiguration.language; - LOGGER.Log(Level.Info, "The language that created the configFile is " + language); + AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); var classHierarchy = TangFactory.GetTang() .GetClassHierarchy(new string[] { typeof(ApplicationIdentifier).Assembly.GetName().Name }); - _evaluatorConfiguration = serializer.FromAvro(_avroConfiguration, classHierarchy); + var evaluatorConfiguration = serializer.FromFile(configFile, classHierarchy); + + IInjector evaluatorInjector = TangFactory.GetTang().NewInjector(evaluatorConfiguration); + + LOGGER.Log(Level.Info, String.Format(CultureInfo.CurrentCulture, + "Evaluator Configuration is deserialized from file {0}:", configFile)); + try + { + _taskConfiguration = evaluatorInjector.GetNamedInstance<InitialTaskConfiguration, string>(); + } + catch (InjectionException) + { + LOGGER.Log(Level.Info, "InitialTaskConfiguration is not set in Evaluator.config."); + } + + try + { + _rootContextConfiguration = evaluatorInjector.GetNamedInstance<RootContextConfiguration, string>(); + } + catch (InjectionException) + { + LOGGER.Log(Level.Warning, "RootContextConfiguration is not set in Evaluator.config."); + } + + try + { + _rootServiceConfiguration = evaluatorInjector.GetNamedInstance<RootServiceConfiguration, string>(); + } + catch (InjectionException) + { + LOGGER.Log(Level.Info, "RootServiceConfiguration is not set in Evaluator.config."); + } + + _applicationId = evaluatorInjector.GetNamedInstance<ApplicationIdentifier, string>(); + _remoteId = evaluatorInjector.GetNamedInstance<DriverRemoteIdentifier, string>(); + _evaluatorId = evaluatorInjector.GetNamedInstance<EvaluatorIdentifier, string>(); + _errorHandlerRid = evaluatorInjector.GetNamedInstance<ErrorHandlerRid, string>(); + _launchId = evaluatorInjector.GetNamedInstance<LaunchId, string>(); } } @@ -83,7 +114,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Utils { get { - _taskConfiguration = _taskConfiguration ?? GetSettingValue(Constants.TaskConfiguration); return _taskConfiguration; } } @@ -107,7 +137,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Utils { get { - _evaluatorId = _evaluatorId ?? GetSettingValue(Constants.EvaluatorIdentifier); return _evaluatorId; } } @@ -116,25 +145,38 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Utils { get { - _applicationId = _applicationId ?? GetSettingValue(Constants.ApplicationIdentifier); return _applicationId; } } - public string ErrorHandlerRID + public string ErrorHandlerRid { get { - _errorHandlerRid = _errorHandlerRid ?? GetSettingValue(Constants.ErrorHandlerRID); return _errorHandlerRid; } } + public string RemoteId + { + get + { + return _remoteId; + } + } + + public string LaunchId + { + get + { + return _launchId; + } + } + public string RootContextConfigurationString { get { - _rootContextConfiguration = _rootContextConfiguration ?? GetSettingValue(Constants.RootContextConfiguration); return _rootContextConfiguration; } } @@ -161,7 +203,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Utils { get { - _rootServiceConfiguration = _rootServiceConfiguration ?? GetSettingValue(Constants.RootServiceConfiguration); return _rootServiceConfiguration; } } @@ -181,18 +222,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Utils new ServiceConfiguration( rootServiceConfigString)); } - } - - private string GetSettingValue(string settingKey) - { - ConfigurationEntry configurationEntry = - _avroConfiguration.Bindings.SingleOrDefault(b => b.key.EndsWith(settingKey, StringComparison.OrdinalIgnoreCase)); - if (configurationEntry == null) - { - return string.Empty; - } - - return configurationEntry.value; } } } http://git-wip-us.apache.org/repos/asf/reef/blob/f4d19ca4/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs index be2c0f8..7078955 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs @@ -50,7 +50,7 @@ namespace Org.Apache.REEF.Evaluator.Tests var eId = evaluatorConfigurations.EvaluatorId; var aId = evaluatorConfigurations.ApplicationId; - var rId = evaluatorConfigurations.ErrorHandlerRID; + var rId = evaluatorConfigurations.ErrorHandlerRid; Logger.Log(Level.Info, "EvaluatorId = " + eId); Logger.Log(Level.Info, "ApplicationId = " + aId); http://git-wip-us.apache.org/repos/asf/reef/blob/f4d19ca4/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs index 4a60be4..96da694 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs @@ -19,27 +19,22 @@ using System; using System.Collections.Generic; -using System.Configuration; using System.Diagnostics; +using System.Globalization; using System.IO; using System.Linq; using System.Threading; -using Org.Apache.REEF.Common.Protobuf.ReefProtocol; +using Org.Apache.REEF.Common.Files; using Org.Apache.REEF.Common.Runtime.Evaluator; -using Org.Apache.REEF.Common.Runtime.Evaluator.Context; using Org.Apache.REEF.Common.Runtime.Evaluator.Utils; -using Org.Apache.REEF.Common.Services; -using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Evaluator.Exceptions; +using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Implementations.InjectionPlan; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Remote.Impl; using Org.Apache.REEF.Wake.Time.Runtime; using Org.Apache.REEF.Wake.Time.Runtime.Event; @@ -48,83 +43,51 @@ namespace Org.Apache.REEF.Evaluator public sealed class Evaluator { private static Logger logger = Logger.GetLogger(typeof(Evaluator)); - private static int heartbeatPeriodInMs = Constants.DefaultEvaluatorHeartbeatPeriodInMs; - private static int heartbeatMaxRetry = Constants.DefaultEvaluatorHeartbeatMaxRetry; - private static IInjector injector; - private static EvaluatorConfigurations evaluatorConfig; + private readonly RuntimeClock _clock; + + [Inject] + private Evaluator( + RuntimeClock clock, + EvaluatorRuntime evaluatorRuntime, + CustomTraceListeners customTraceListeners, + CustomTraceLevel customTraceLevel) + { + _clock = clock; + SetCustomTraceListeners(customTraceListeners, customTraceLevel); + SetRuntimeHandlers(evaluatorRuntime, clock); + } + + private void Run() + { + _clock.Run(); + } + /// <summary> + /// The command line to run it is "evaluator.exe evaluator.config" + /// </summary> + /// <param name="args"></param> public static void Main(string[] args) { try { - Console.WriteLine("START: {0} Evaluator::InitInjector.", DateTime.Now); - Stopwatch timer = new Stopwatch(); - InitInjector(); - SetCustomTraceListeners(); // logger is reset by this. - timer.Stop(); - Console.WriteLine("EXIT: {0} Evaluator::InitInjector. Duration: [{1}].", DateTime.Now, timer.Elapsed); - - using (logger.LogScope("Evaluator::Main")) + if (args.Count() != 1) { - if (IsDebuggingEnabled()) - { - AttachDebugger(); - } - - // Register our exception handler - AppDomain.CurrentDomain.UnhandledException += UnhandledExceptionHandler; - - // Fetch some settings from the ConfigurationManager - SetHeartbeatPeriod(); - SetHeartbeatMaxRetry(); - - // Parse the command line - // The error handler RID should now be written in the configuration file instead - if (args.Count() != 1) - { - var e = new InvalidOperationException("must supply only the evaluator config file!"); - Utilities.Diagnostics.Exceptions.Throw(e, logger); - } - - // evaluator configuration file - string evaluatorConfigurationPath = args[0]; - - // Parse the evaluator configuration. - evaluatorConfig = new EvaluatorConfigurations(evaluatorConfigurationPath); - - string rId = evaluatorConfig.ErrorHandlerRID; - ContextConfiguration rootContextConfiguration = evaluatorConfig.RootContextConfiguration; - Optional<TaskConfiguration> rootTaskConfig = evaluatorConfig.TaskConfiguration; - Optional<ServiceConfiguration> rootServiceConfig = evaluatorConfig.RootServiceConfiguration; - - // remoteManager used as client-only in evaluator - IRemoteManager<REEFMessage> remoteManager = injector.GetInstance<IRemoteManagerFactory>().GetInstance(new REEFMessageCodec()); - IRemoteIdentifier remoteId = new SocketRemoteIdentifier(NetUtilities.ParseIpEndpoint(rId)); - - RuntimeClock clock = InstantiateClock(); - logger.Log(Level.Info, "Application Id: " + evaluatorConfig.ApplicationId); - EvaluatorSettings evaluatorSettings = new EvaluatorSettings( - evaluatorConfig.ApplicationId, - evaluatorConfig.EvaluatorId, - heartbeatPeriodInMs, - heartbeatMaxRetry, - rootContextConfiguration, - clock, - remoteManager, - injector); - - HeartBeatManager heartBeatManager = new HeartBeatManager(evaluatorSettings, remoteId); - ContextManager contextManager = new ContextManager(heartBeatManager, rootServiceConfig, rootTaskConfig); - EvaluatorRuntime evaluatorRuntime = new EvaluatorRuntime(contextManager, heartBeatManager); + var e = new InvalidOperationException("Must supply only the evaluator.config file!"); + Utilities.Diagnostics.Exceptions.Throw(e, logger); + } - // TODO: replace with injectionFuture - heartBeatManager._evaluatorRuntime = evaluatorRuntime; - heartBeatManager._contextManager = contextManager; + if (IsDebuggingEnabled()) + { + AttachDebugger(); + } + AppDomain.CurrentDomain.UnhandledException += UnhandledExceptionHandler; - SetRuntimeHandlers(evaluatorRuntime, clock); + Evaluator evaluator = TangFactory.GetTang() + .NewInjector(ReadClrBridgeConfiguration(), ReadEvaluatorConfiguration(args[0])) + .GetInstance<Evaluator>(); - clock.Run(); - } + evaluator.Run(); + logger.Log(Level.Info, "Evaluator is returned from Run()"); } catch (Exception e) { @@ -162,67 +125,16 @@ namespace Org.Apache.REEF.Evaluator } /// <summary> - /// Sets the heartbeat period from the ConfigurationManager - /// </summary> - private static void SetHeartbeatPeriod() - { - var heartbeatPeriodFromConfig = ConfigurationManager.AppSettings["EvaluatorHeartbeatPeriodInMs"]; - - var heartbeatPeriod = 0; - - if (!string.IsNullOrWhiteSpace(heartbeatPeriodFromConfig) && - int.TryParse(heartbeatPeriodFromConfig, out heartbeatPeriod)) - { - heartbeatPeriodInMs = heartbeatPeriod; - } - logger.Log(Level.Verbose, - "Evaluator heartbeat period set to be " + heartbeatPeriodInMs + " milliSeconds."); - } - - /// <summary> - /// Sets the heartbeat retry count from the ConfigurationManager - /// </summary> - private static void SetHeartbeatMaxRetry() - { - var maxHeartbeatRetry = 0; - var heartbeatMaxRetryFromConfig = - ConfigurationManager.AppSettings["EvaluatorHeartbeatRetryMaxTimes"]; - - if (!string.IsNullOrWhiteSpace(heartbeatMaxRetryFromConfig) && - int.TryParse(heartbeatMaxRetryFromConfig, out maxHeartbeatRetry)) - { - heartbeatMaxRetry = maxHeartbeatRetry; - } - logger.Log(Level.Verbose, - "Evaluator heartbeat max retry set to be " + heartbeatMaxRetry + " times."); - } - - /// <summary> - /// Instantiates the root injector of the Evaluator. - /// </summary> - /// <exception cref="EvaluatorInjectorInstantiationException">If the injector cannot be instantiated.</exception> - private static void InitInjector() - { - try - { - injector = TangFactory.GetTang().NewInjector(ReadEvaluatorConfiguration()); - } - catch (Exception e) - { - throw new EvaluatorInjectorInstantiationException(e); - } - } - - /// <summary> /// Reads the Evaluator Configuration. /// </summary> /// <exception cref="EvaluatorConfigurationFileNotFoundException">When the configuration file cannot be found.</exception> /// <exception cref="EvaluatorConfigurationParseException">When the configuration file exists, but can't be deserialized.</exception> /// <returns></returns> - private static IConfiguration ReadEvaluatorConfiguration() + private static IConfiguration ReadClrBridgeConfiguration() { - string clrRuntimeConfigurationFile = Path.Combine(Directory.GetCurrentDirectory(), "reef", "global", - Common.Constants.ClrBridgeRuntimeConfiguration); + var clrRuntimeConfigurationFile = Path.Combine(Directory.GetCurrentDirectory(), "reef", "global", + new REEFFileNames().GetClrBridgeConfigurationName()); + if (!File.Exists(clrRuntimeConfigurationFile)) { throw new EvaluatorConfigurationFileNotFoundException(clrRuntimeConfigurationFile); @@ -230,7 +142,10 @@ namespace Org.Apache.REEF.Evaluator try { - return new AvroConfigurationSerializer().FromFile(clrRuntimeConfigurationFile); + var clrDriverConfig = new AvroConfigurationSerializer().FromFile(clrRuntimeConfigurationFile); + logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, + "Clr Driver Configuration is deserialized from file {0}:", clrRuntimeConfigurationFile)); + return clrDriverConfig; } catch (Exception e) { @@ -238,44 +153,36 @@ namespace Org.Apache.REEF.Evaluator } } - /// <summary> - /// Instantiates the RuntimeClock - /// </summary> - /// <exception cref="ClockInstantiationException">When the clock can't be instantiated.</exception> - /// <returns></returns> - private static RuntimeClock InstantiateClock() + private static IConfiguration ReadEvaluatorConfiguration(string evaluatorConfigFile) { - IConfiguration clockConfiguration = new ConfigurationModuleBuilder().Build().Build(); - try + if (string.IsNullOrWhiteSpace(evaluatorConfigFile)) { - return TangFactory.GetTang().NewInjector(clockConfiguration).GetInstance<RuntimeClock>(); + Utilities.Diagnostics.Exceptions.Throw(new ArgumentNullException("configFile"), logger); } - catch (Exception exception) + if (!File.Exists(evaluatorConfigFile)) { - throw new ClockInstantiationException("Unable to instantiate the clock", exception); - } + Utilities.Diagnostics.Exceptions.Throw(new FileNotFoundException("cannot find file " + evaluatorConfigFile), logger); + } + + AvroConfigurationSerializer serializer = new AvroConfigurationSerializer(); + var classHierarchy = TangFactory.GetTang() + .GetClassHierarchy(new string[] { typeof(ApplicationIdentifier).Assembly.GetName().Name }); + var evaluatorConfiguration = serializer.FromFile(evaluatorConfigFile, classHierarchy); + + logger.Log(Level.Info, string.Format(CultureInfo.CurrentCulture, + "Evaluator Configuration is deserialized from file {0}:", evaluatorConfigFile)); + + return evaluatorConfiguration; } - private static void SetCustomTraceListeners() + private static void SetCustomTraceListeners(CustomTraceListeners customTraceListener, CustomTraceLevel traceLevel) { - ISet<TraceListener> customTraceListeners; - try - { - customTraceListeners = injector.GetInstance<CustomTraceListeners>().Listeners; - } - catch (Exception e) - { - Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, logger); - - // custom trace listener not set properly, use empty set - customTraceListeners = new HashSet<TraceListener>(); - } + ISet<TraceListener> customTraceListeners = customTraceListener.Listeners; foreach (TraceListener listener in customTraceListeners) { Logger.AddTraceListener(listener); } logger = Logger.GetLogger(typeof(Evaluator)); - CustomTraceLevel traceLevel = injector.GetInstance<CustomTraceLevel>(); Logger.SetCustomLevel(traceLevel.TraceLevel); }
