Repository: reef Updated Branches: refs/heads/master 1f965d794 -> 4ddbc29be
[REEF-1289] Add an EvaluatorConfiguration for the .NET Evaluator This addressed the issue by * Add an EvaluatorConfiguration to the .NET evaluator and pipe it through the Java bridge. JIRA: [REEF-1289](https://issues.apache.org/jira/browse/REEF-1289) This closes #908 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4ddbc29b Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4ddbc29b Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4ddbc29b Branch: refs/heads/master Commit: 4ddbc29be88177677df1d23da159ec07c45a58fd Parents: 1f965d7 Author: Andrew Chung <[email protected]> Authored: Mon Mar 28 11:28:54 2016 -0700 Committer: Julia Wang <[email protected]> Committed: Tue Mar 29 15:21:39 2016 -0700 ---------------------------------------------------------------------- .../AllocatedEvaluatorClr2Java.cpp | 20 ++- lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h | 8 +- .../Runtime/Evaluator/Context/ContextManager.cs | 38 ++++- .../Evaluator/Context/RootContextLauncher.cs | 50 ++----- .../Runtime/Evaluator/EvaluatorSettings.cs | 150 +------------------ .../Evaluator/Utils/NamedparameterAlias.cs | 5 + .../Clr2java/IAllocatedEvaluatorClr2Java.cs | 9 +- .../Bridge/Events/AllocatedEvaluator.cs | 72 +++------ .../EvaluatorConfigurationsTests.cs | 29 +++- lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs | 10 +- .../App.config | 32 ++++ ...rg.Apache.REEF.Examples.DriverRestart.csproj | 7 +- .../javabridge/AllocatedEvaluatorBridge.java | 35 ++++- .../evaluator/AllocatedEvaluatorImpl.java | 70 ++++++--- .../evaluator/EvaluatorConfiguration.java | 3 + .../parameters/EvaluatorConfiguration.java | 33 ++++ 16 files changed, 280 insertions(+), 291 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp index 8b64538..397f536 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/AllocatedEvaluatorClr2Java.cpp @@ -68,11 +68,11 @@ namespace Org { } } - void AllocatedEvaluatorClr2Java::SubmitContext(String^ contextConfigStr) { + void AllocatedEvaluatorClr2Java::SubmitContext(String^ evaluatorConfigStr, String^ contextConfigStr) { ManagedLog::LOGGER->LogStart("AllocatedEvaluatorClr2Java::SubmitContext"); JNIEnv *env = RetrieveEnv(_jvm); jclass jclassAllocatedEvaluator = env->GetObjectClass(_jobjectAllocatedEvaluator); - jmethodID jmidSubmitContext = env->GetMethodID(jclassAllocatedEvaluator, "submitContextString", "(Ljava/lang/String;)V"); + jmethodID jmidSubmitContext = env->GetMethodID(jclassAllocatedEvaluator, "submitContextString", "(Ljava/lang/String;Ljava/lang/String;)V"); if (jmidSubmitContext == NULL) { ManagedLog::LOGGER->Log("jmidSubmitContext is NULL"); @@ -81,15 +81,16 @@ namespace Org { env->CallObjectMethod( _jobjectAllocatedEvaluator, jmidSubmitContext, + JavaStringFromManagedString(env, evaluatorConfigStr), JavaStringFromManagedString(env, contextConfigStr)); ManagedLog::LOGGER->LogStop("AllocatedEvaluatorClr2Java::SubmitContext"); } - void AllocatedEvaluatorClr2Java::SubmitContextAndTask(String^ contextConfigStr, String^ taskConfigStr) { + void AllocatedEvaluatorClr2Java::SubmitContextAndTask(String^ evaluatorConfigStr, String^ contextConfigStr, String^ taskConfigStr) { ManagedLog::LOGGER->LogStart("AllocatedEvaluatorClr2Java::SubmitContextAndTask"); JNIEnv *env = RetrieveEnv(_jvm); jclass jclassAllocatedEvaluator = env->GetObjectClass(_jobjectAllocatedEvaluator); - jmethodID jmidSubmitContextAndTask = env->GetMethodID(jclassAllocatedEvaluator, "submitContextAndTaskString", "(Ljava/lang/String;Ljava/lang/String;)V"); + jmethodID jmidSubmitContextAndTask = env->GetMethodID(jclassAllocatedEvaluator, "submitContextAndTaskString", "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V"); if (jmidSubmitContextAndTask == NULL) { ManagedLog::LOGGER->Log("jmidSubmitContextAndTask is NULL"); @@ -98,16 +99,17 @@ namespace Org { env->CallObjectMethod( _jobjectAllocatedEvaluator, jmidSubmitContextAndTask, + JavaStringFromManagedString(env, evaluatorConfigStr), JavaStringFromManagedString(env, contextConfigStr), JavaStringFromManagedString(env, taskConfigStr)); ManagedLog::LOGGER->LogStop("AllocatedEvaluatorClr2Java::SubmitContextAndTask"); } - void AllocatedEvaluatorClr2Java::SubmitContextAndService(String^ contextConfigStr, String^ serviceConfigStr) { + void AllocatedEvaluatorClr2Java::SubmitContextAndService(String^ evaluatorConfigStr, String^ contextConfigStr, String^ serviceConfigStr) { ManagedLog::LOGGER->LogStart("AllocatedEvaluatorClr2Java::SubmitContextAndService"); JNIEnv *env = RetrieveEnv(_jvm); jclass jclassAllocatedEvaluator = env->GetObjectClass(_jobjectAllocatedEvaluator); - jmethodID jmidSubmitContextAndService = env->GetMethodID(jclassAllocatedEvaluator, "submitContextAndServiceString", "(Ljava/lang/String;Ljava/lang/String;)V"); + jmethodID jmidSubmitContextAndService = env->GetMethodID(jclassAllocatedEvaluator, "submitContextAndServiceString", "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V"); if (jmidSubmitContextAndService == NULL) { ManagedLog::LOGGER->Log("jmidSubmitContextAndService is NULL"); @@ -116,16 +118,17 @@ namespace Org { env->CallObjectMethod( _jobjectAllocatedEvaluator, jmidSubmitContextAndService, + JavaStringFromManagedString(env, evaluatorConfigStr), JavaStringFromManagedString(env, contextConfigStr), JavaStringFromManagedString(env, serviceConfigStr)); ManagedLog::LOGGER->LogStop("AllocatedEvaluatorClr2Java::SubmitContextAndService"); } - void AllocatedEvaluatorClr2Java::SubmitContextAndServiceAndTask(String^ contextConfigStr, String^ serviceConfigStr, String^ taskConfigStr) { + void AllocatedEvaluatorClr2Java::SubmitContextAndServiceAndTask(String^ evaluatorConfigStr, String^ contextConfigStr, String^ serviceConfigStr, String^ taskConfigStr) { ManagedLog::LOGGER->LogStart("AllocatedEvaluatorClr2Java::SubmitContextAndServiceAndTask"); JNIEnv *env = RetrieveEnv(_jvm); jclass jclassAllocatedEvaluator = env->GetObjectClass(_jobjectAllocatedEvaluator); - jmethodID jmidSubmitContextAndServiceAndTask = env->GetMethodID(jclassAllocatedEvaluator, "submitContextAndServiceAndTaskString", "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V"); + jmethodID jmidSubmitContextAndServiceAndTask = env->GetMethodID(jclassAllocatedEvaluator, "submitContextAndServiceAndTaskString", "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V"); if (jmidSubmitContextAndServiceAndTask == NULL) { ManagedLog::LOGGER->Log("jmidSubmitContextAndServiceAndTask is NULL"); @@ -134,6 +137,7 @@ namespace Org { env->CallObjectMethod( _jobjectAllocatedEvaluator, jmidSubmitContextAndServiceAndTask, + JavaStringFromManagedString(env, evaluatorConfigStr), JavaStringFromManagedString(env, contextConfigStr), JavaStringFromManagedString(env, serviceConfigStr), JavaStringFromManagedString(env, taskConfigStr)); http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h index 3254640..21a2f4a 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h +++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h @@ -62,10 +62,10 @@ namespace Org { AllocatedEvaluatorClr2Java(JNIEnv *env, jobject jallocatedEvaluator); ~AllocatedEvaluatorClr2Java(); !AllocatedEvaluatorClr2Java(); - virtual void SubmitContextAndTask(String^ contextConfigStr, String^ taskConfigStr); - virtual void SubmitContext(String^ contextConfigStr); - virtual void SubmitContextAndService(String^ contextConfigStr, String^ serviceConfigStr); - virtual void SubmitContextAndServiceAndTask(String^ contextConfigStr, String^ serviceConfigStr, String^ taskConfigStr); + virtual void SubmitContextAndTask(String^ evaluatorConfigStr, String^ contextConfigStr, String^ taskConfigStr); + virtual void SubmitContext(String^ evaluatorConfigStr, String^ contextConfigStr); + virtual void SubmitContextAndService(String^ evaluatorConfigStr, String^ contextConfigStr, String^ serviceConfigStr); + virtual void SubmitContextAndServiceAndTask(String^ evaluatorConfigStr, String^ contextConfigStr, String^ serviceConfigStr, String^ taskConfigStr); virtual void OnError(String^ message); virtual void Close(); virtual String^ GetId(); http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs index 7d16ed4..f959cca 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs @@ -22,9 +22,11 @@ using System.Globalization; using System.Linq; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Runtime.Evaluator.Task; +using Org.Apache.REEF.Common.Runtime.Evaluator.Utils; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Logging; @@ -42,20 +44,42 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context [Inject] private ContextManager( IHeartBeatManager heartBeatManager, - EvaluatorSettings evaluatorSettings, - AvroConfigurationSerializer serializer) + AvroConfigurationSerializer serializer, + [Parameter(typeof(RootContextConfiguration))] string rootContextConfiguration, + [Parameter(typeof(RootServiceConfiguration))] string rootServiceConfiguration) + : this(heartBeatManager, serializer, serializer.FromString(rootContextConfiguration), + serializer.FromString(rootServiceConfiguration), Optional<IConfiguration>.Empty()) + { + } + + [Inject] + private ContextManager( + IHeartBeatManager heartBeatManager, + AvroConfigurationSerializer serializer, + [Parameter(typeof(RootContextConfiguration))] string rootContextConfiguration, + [Parameter(typeof(RootServiceConfiguration))] string rootServiceConfiguration, + [Parameter(typeof(InitialTaskConfiguration))] string initialTaskConfiguration) + : this(heartBeatManager, serializer, serializer.FromString(rootContextConfiguration), + serializer.FromString(rootServiceConfiguration), Optional<IConfiguration>.Of(serializer.FromString(initialTaskConfiguration))) + { + } + + private ContextManager( + IHeartBeatManager heartBeatManager, + AvroConfigurationSerializer serializer, + IConfiguration rootContextConfiguration, + IConfiguration rootServiceConfiguration, + Optional<IConfiguration> initialTaskConfiguration) { - // TODO[JIRA REEF-217]: Inject base Injector and pass Injector to RootContextLauncher using (LOGGER.LogFunction("ContextManager::ContextManager")) { _heartBeatManager = heartBeatManager; _serializer = serializer; _rootContextLauncher = new RootContextLauncher( - evaluatorSettings.RootContextId, - evaluatorSettings.RootContextConfig, - evaluatorSettings.RootServiceConfiguration, - evaluatorSettings.RootTaskConfiguration, + rootContextConfiguration, + rootServiceConfiguration, + initialTaskConfiguration, heartBeatManager); } } http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs index 60e749c..2f18816 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Generic; using System.Globalization; +using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Common.Services; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; @@ -39,17 +40,24 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context private ISet<object> _services; private ContextRuntime _rootContext; - public RootContextLauncher(string id, IConfiguration contextConfiguration, - Optional<IConfiguration> rootServiceConfig, Optional<IConfiguration> rootTaskConfig, IHeartBeatManager heartbeatManager) + public RootContextLauncher( + IConfiguration contextConfiguration, + IConfiguration rootServiceConfig, + Optional<IConfiguration> rootTaskConfig, + IHeartBeatManager heartbeatManager) { - Id = id; _rootContextConfiguration = contextConfiguration; - _rootServiceInjector = InjectServices(rootServiceConfig); + _rootServiceInjector = TangFactory.GetTang().NewInjector(rootServiceConfig); + Id = _rootServiceInjector + .ForkInjector(contextConfiguration) + .GetNamedInstance<ContextConfigurationOptions.ContextIdentifier, string>(); + _services = _rootServiceInjector.GetNamedInstance<ServicesSet, ISet<object>>(); + Logger.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "injected service(s)")); _rootServiceInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, heartbeatManager); RootTaskConfig = rootTaskConfig; } - public Optional<IConfiguration> RootTaskConfig { get; set; } + public Optional<IConfiguration> RootTaskConfig { get; private set; } public string Id { get; private set; } @@ -62,37 +70,5 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context return _rootContext; } - - private IInjector InjectServices(Optional<IConfiguration> serviceConfig) - { - // TODO[JIRA REEF-217]: Use base injector for the Evaluator here instead. - IInjector rootServiceInjector; - - if (serviceConfig.IsPresent()) - { - rootServiceInjector = TangFactory.GetTang().NewInjector(serviceConfig.Value); - try - { - _services = rootServiceInjector.GetNamedInstance<ServicesSet, ISet<object>>(); - Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "injected service(s)")); - } - catch (Exception e) - { - var errorMessage = string.Format(CultureInfo.InvariantCulture, - "Failed to inject service: encountered error {1} with message [{0}] and stack trace:[{2}]", e, - e.Message, e.StackTrace); - Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Failed to instantiate service.", Logger); - var ex = new InvalidOperationException(errorMessage, e); - Utilities.Diagnostics.Exceptions.Throw(ex, Logger); - } - } - else - { - rootServiceInjector = TangFactory.GetTang().NewInjector(); - Logger.Log(Level.Info, "no service provided for injection."); - } - - return rootServiceInjector; - } } } http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs index be7d625..d561cd3 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs @@ -15,18 +15,13 @@ // specific language governing permissions and limitations // under the License. -using System; -using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Common.Evaluator; using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Runtime.Evaluator.Parameters; using Org.Apache.REEF.Common.Runtime.Evaluator.Utils; using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Exceptions; -using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Logging; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Time; @@ -40,19 +35,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator private readonly string _applicationId; private readonly string _evaluatorId; - private readonly string _rootContextId; private readonly int _heartBeatPeriodInMs; private readonly int _maxHeartbeatRetries; private readonly IClock _clock; private readonly IRemoteManager<REEFMessage> _remoteManager; - private readonly IInjector _injector; - private readonly IConfiguration _rootContextConfig; - private readonly AvroConfigurationSerializer _serializer; - private readonly Optional<IConfiguration> _rootTaskConfiguration; - private readonly Optional<IConfiguration> _rootServiceConfiguration; - - private EvaluatorOperationState _operationState; - private INameClient _nameClient; /// <summary> /// Constructor with @@ -61,8 +47,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator /// <param name="evaluatorId"></param> /// <param name="heartbeatPeriodInMs"></param> /// <param name="maxHeartbeatRetries"></param> - /// <param name="rootContextConfigString"></param> - /// <param name="serializer"></param> /// <param name="clock"></param> /// <param name="remoteManagerFactory"></param> /// <param name="reefMessageCodec"></param> @@ -73,13 +57,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator [Parameter(typeof(EvaluatorIdentifier))] string evaluatorId, [Parameter(typeof(EvaluatorHeartbeatPeriodInMs))] int heartbeatPeriodInMs, [Parameter(typeof(HeartbeatMaxRetry))] int maxHeartbeatRetries, - [Parameter(typeof(RootContextConfiguration))] string rootContextConfigString, - AvroConfigurationSerializer serializer, RuntimeClock clock, IRemoteManagerFactory remoteManagerFactory, REEFMessageCodec reefMessageCodec, IInjector injector) : - this(applicationId, evaluatorId, heartbeatPeriodInMs, maxHeartbeatRetries, rootContextConfigString, serializer, + this(applicationId, evaluatorId, heartbeatPeriodInMs, maxHeartbeatRetries, clock, remoteManagerFactory, reefMessageCodec, injector, null) { } @@ -90,53 +72,28 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator [Parameter(typeof(EvaluatorIdentifier))] string evaluatorId, [Parameter(typeof(EvaluatorHeartbeatPeriodInMs))] int heartbeatPeriodInMs, [Parameter(typeof(HeartbeatMaxRetry))] int maxHeartbeatRetries, - [Parameter(typeof(RootContextConfiguration))] string rootContextConfigString, - AvroConfigurationSerializer serializer, RuntimeClock clock, IRemoteManagerFactory remoteManagerFactory, REEFMessageCodec reefMessageCodec, IInjector injector, INameClient nameClient) { - _serializer = serializer; - _injector = injector; _applicationId = applicationId; _evaluatorId = evaluatorId; _heartBeatPeriodInMs = heartbeatPeriodInMs; _maxHeartbeatRetries = maxHeartbeatRetries; _clock = clock; - if (string.IsNullOrWhiteSpace(rootContextConfigString)) - { - Utilities.Diagnostics.Exceptions.Throw( - new ArgumentException("empty or null rootContextConfigString"), Logger); - } - _rootContextConfig = _serializer.FromString(rootContextConfigString); - - _rootContextId = injector.ForkInjector(_rootContextConfig).GetNamedInstance<ContextConfigurationOptions.ContextIdentifier, string>(); - _rootTaskConfiguration = CreateTaskConfiguration(); - _rootServiceConfiguration = CreateRootServiceConfiguration(); - _remoteManager = remoteManagerFactory.GetInstance(reefMessageCodec); - _operationState = EvaluatorOperationState.OPERATIONAL; - _nameClient = nameClient; + EvaluatorInjector = injector; + OperationState = EvaluatorOperationState.OPERATIONAL; + NameClient = nameClient; } /// <summary> /// Operator State. Can be set and get. /// </summary> - public EvaluatorOperationState OperationState - { - get - { - return _operationState; - } - - set - { - _operationState = value; - } - } + public EvaluatorOperationState OperationState { get; set; } /// <summary> /// Return Evaluator Id got from Evaluator Configuration @@ -150,14 +107,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } /// <summary> - /// Returns the root context ID. - /// </summary> - public string RootContextId - { - get { return _rootContextId; } - } - - /// <summary> /// Return HeartBeatPeriodInMs from NamedParameter /// </summary> public int HeartBeatPeriodInMs @@ -191,30 +140,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } /// <summary> - /// return Root Context Configuration passed from Evaluator configuration - /// </summary> - public IConfiguration RootContextConfig - { - get { return _rootContextConfig; } - } - - /// <summary> - /// return Root Task Configuration passed from Evaluator configuration - /// </summary> - public Optional<IConfiguration> RootTaskConfiguration - { - get { return _rootTaskConfiguration; } - } - - /// <summary> - /// return Root Service Configuration passed from Evaluator configuration - /// </summary> - public Optional<IConfiguration> RootServiceConfiguration - { - get { return _rootServiceConfiguration; } - } - - /// <summary> /// return Runtime Clock injected from the constructor /// </summary> public IClock RuntimeClock @@ -228,75 +153,16 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator /// <summary> /// Return Name Client /// </summary> - public INameClient NameClient - { - get - { - return _nameClient; - } - - set - { - _nameClient = value; - } - } + public INameClient NameClient { get; set; } /// <summary> /// return Remote manager /// </summary> public IRemoteManager<REEFMessage> RemoteManager { - get - { - return _remoteManager; - } - } - - /// <summary> - /// Injector that contains clrDrive configuration and Evaluator configuration - /// </summary> - public IInjector EvaluatorInjector - { - get - { - return _injector; - } - } - - private Optional<IConfiguration> CreateTaskConfiguration() - { - string taskConfigString = null; - try - { - taskConfigString = _injector.GetNamedInstance<InitialTaskConfiguration, string>(); - } - catch (InjectionException) - { - Logger.Log(Level.Info, "InitialTaskConfiguration is not set in Evaluator.config."); - } - return string.IsNullOrEmpty(taskConfigString) - ? Optional<IConfiguration>.Empty() - : Optional<IConfiguration>.Of(_serializer.FromString(taskConfigString)); + get { return _remoteManager; } } - private Optional<IConfiguration> CreateRootServiceConfiguration() - { - string rootServiceConfigString = null; - try - { - rootServiceConfigString = _injector.GetNamedInstance<RootServiceConfiguration, string>(); - } - catch (InjectionException) - { - Logger.Log(Level.Info, "RootServiceConfiguration is not set in Evaluator.config."); - } - - if (string.IsNullOrEmpty(rootServiceConfigString)) - { - return Optional<IConfiguration>.Empty(); - } - - return Optional<IConfiguration>.Of(_serializer.FromString(rootServiceConfigString)); - } + public IInjector EvaluatorInjector { get; private set; } } } http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/NamedparameterAlias.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/NamedparameterAlias.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/NamedparameterAlias.cs index ca02db5..17a524a 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/NamedparameterAlias.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Utils/NamedparameterAlias.cs @@ -35,6 +35,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Utils { } + [NamedParameter(alias: "org.apache.reef.runtime.common.evaluator.parameters.EvaluatorConfiguration", aliasLanguage: Language.Java)] + internal sealed class EvaluatorConfiguration : Name<string> + { + } + [NamedParameter(alias: "org.apache.reef.runtime.common.evaluator.parameters.InitialTaskConfiguration", aliasLanguage: Language.Java)] internal sealed class InitialTaskConfiguration : Name<string> { http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IAllocatedEvaluatorClr2Java.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IAllocatedEvaluatorClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IAllocatedEvaluatorClr2Java.cs index 5ecd722..de52d4a 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IAllocatedEvaluatorClr2Java.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IAllocatedEvaluatorClr2Java.cs @@ -23,13 +23,14 @@ namespace Org.Apache.REEF.Driver.Bridge.Clr2java [Private, Interop("AllocatedEvaluatorClr2Java.cpp", "Clr2JavaImpl.h")] public interface IAllocatedEvaluatorClr2Java : IClr2Java { - void SubmitContextAndTask(string contextConfigStr, string taskConfigStr); + void SubmitContextAndTask(string evaluatorConfigStr, string contextConfigStr, string taskConfigStr); - void SubmitContext(string contextConfigStr); + void SubmitContext(string evaluatorConfigStr, string contextConfigStr); - void SubmitContextAndService(string contextConfigStr, string serviceConfigStr); + void SubmitContextAndService(string evaluatorConfigStr, string contextConfigStr, string serviceConfigStr); - void SubmitContextAndServiceAndTask(string contextConfigStr, string serviceConfigStr, string taskConfigStr); + void SubmitContextAndServiceAndTask( + string evaluatorConfigStr, string contextConfigStr, string serviceConfigStr, string taskConfigStr); void Close(); http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs index 28943fe..d15e9b3 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/AllocatedEvaluator.cs @@ -24,7 +24,9 @@ using Org.Apache.REEF.Driver.Bridge.Clr2java; using Org.Apache.REEF.Driver.Evaluator; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Driver.Bridge.Events @@ -38,12 +40,20 @@ namespace Org.Apache.REEF.Driver.Bridge.Events private IEvaluatorDescriptor _evaluatorDescriptor; - private readonly ISet<IConfigurationProvider> _configurationProviders; + private readonly string _evaluatorConfigStr; public AllocatedEvaluator(IAllocatedEvaluatorClr2Java clr2Java, ISet<IConfigurationProvider> configurationProviders) { - _configurationProviders = configurationProviders; _serializer = new AvroConfigurationSerializer(); + + var evaluatorConfig = TangFactory.GetTang().NewConfigurationBuilder().Build(); + foreach (var configurationProvider in configurationProviders) + { + evaluatorConfig = Configurations.Merge(evaluatorConfig, configurationProvider.GetConfiguration()); + } + + _evaluatorConfigStr = _serializer.ToString(evaluatorConfig); + Clr2Java = clr2Java; Id = Clr2Java.GetId(); ProcessNewEvaluator(); @@ -68,59 +78,28 @@ namespace Org.Apache.REEF.Driver.Bridge.Events Common.Context.ContextConfiguration.ConfigurationModule.Set( Common.Context.ContextConfiguration.Identifier, "RootContext_" + this.Id).Build(); - SubmitContextAndTask(contextConfiguration, taskConfiguration); + Clr2Java.SubmitContextAndTask(_evaluatorConfigStr, _serializer.ToString(contextConfiguration), _serializer.ToString(taskConfiguration)); } + public void SubmitContext(IConfiguration contextConfiguration) { - LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContext"); - string context = _serializer.ToString(contextConfiguration); - LOGGER.Log(Level.Verbose, "serialized contextConfiguration: " + context); - Clr2Java.SubmitContext(context); + Clr2Java.SubmitContext(_evaluatorConfigStr, _serializer.ToString(contextConfiguration)); } public void SubmitContextAndTask(IConfiguration contextConfiguration, IConfiguration taskConfiguration) { - LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndTask"); - - // TODO: Change this to service configuration when REEF-289(https://issues.apache.org/jira/browse/REEF-289) is fixed. - taskConfiguration = MergeWithConfigurationProviders(taskConfiguration); - string context = _serializer.ToString(contextConfiguration); - string task = _serializer.ToString(taskConfiguration); - - LOGGER.Log(Level.Verbose, "serialized contextConfiguration: " + context); - LOGGER.Log(Level.Verbose, "serialized taskConfiguration: " + task); - - Clr2Java.SubmitContextAndTask(context, task); + Clr2Java.SubmitContextAndTask(_evaluatorConfigStr, _serializer.ToString(contextConfiguration), _serializer.ToString(taskConfiguration)); } public void SubmitContextAndService(IConfiguration contextConfiguration, IConfiguration serviceConfiguration) { - LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndService"); - - var serviceConf = MergeWithConfigurationProviders(serviceConfiguration); - string context = _serializer.ToString(contextConfiguration); - string service = _serializer.ToString(serviceConf); - - LOGGER.Log(Level.Verbose, "serialized contextConfiguration: " + context); - LOGGER.Log(Level.Verbose, "serialized serviceConfiguration: " + service); - - Clr2Java.SubmitContextAndService(context, service); + Clr2Java.SubmitContextAndService(_evaluatorConfigStr, _serializer.ToString(contextConfiguration), _serializer.ToString(serviceConfiguration)); } public void SubmitContextAndServiceAndTask(IConfiguration contextConfiguration, IConfiguration serviceConfiguration, IConfiguration taskConfiguration) { - LOGGER.Log(Level.Info, "AllocatedEvaluator::SubmitContextAndServiceAndTask"); - - var serviceConf = MergeWithConfigurationProviders(serviceConfiguration); - string context = _serializer.ToString(contextConfiguration); - string service = _serializer.ToString(serviceConf); - string task = _serializer.ToString(taskConfiguration); - - LOGGER.Log(Level.Verbose, "serialized contextConfiguration: " + context); - LOGGER.Log(Level.Verbose, "serialized serviceConfiguration: " + service); - LOGGER.Log(Level.Verbose, "serialized taskConfiguration: " + task); - - Clr2Java.SubmitContextAndServiceAndTask(context, service, task); + Clr2Java.SubmitContextAndServiceAndTask( + _evaluatorConfigStr, _serializer.ToString(contextConfiguration), _serializer.ToString(serviceConfiguration), _serializer.ToString(taskConfiguration)); } public IEvaluatorDescriptor GetEvaluatorDescriptor() @@ -174,18 +153,5 @@ namespace Org.Apache.REEF.Driver.Bridge.Events } } } - - private IConfiguration MergeWithConfigurationProviders(IConfiguration configuration) - { - IConfiguration config = configuration; - if (_configurationProviders != null) - { - foreach (var configurationProvider in _configurationProviders) - { - config = Configurations.Merge(config, configurationProvider.GetConfiguration()); - } - } - return config; - } } } http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs index 980f512..7eade73 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs @@ -16,6 +16,7 @@ // under the License. using System.Collections.Generic; +using Org.Apache.REEF.Common.Evaluator; using Org.Apache.REEF.Common.Runtime.Evaluator.Utils; using Org.Apache.REEF.Common.Services; using Org.Apache.REEF.Common.Tasks; @@ -66,26 +67,38 @@ namespace Org.Apache.REEF.Evaluator.Tests } /// <summary> - /// This test is to deserialize evaluator configuration for Task, Context and Service + /// This test is to deserialize evaluator configuration for Evaluator, Task, Context and Service /// using alias if the parameter in the configuration cannot be found in the class hierarchy. /// </summary> [Fact] [Trait("Priority", "0")] [Trait("Category", "Unit")] - public void TestDeserializeContextServiceTaskWithAlias() + public void TestDeserializeEvaluatorContextServiceTaskWithAlias() { var serializer = new AvroConfigurationSerializer(); var config = DeserializeConfigWithAlias(); var evaluatorInjector = TangFactory.GetTang().NewInjector(config); + var evaluatorConfigString = evaluatorInjector.GetNamedInstance<EvaluatorConfiguration, string>(); var taskConfigString = evaluatorInjector.GetNamedInstance<InitialTaskConfiguration, string>(); var contextConfigString = evaluatorInjector.GetNamedInstance<RootContextConfiguration, string>(); var serviceConfigString = evaluatorInjector.GetNamedInstance<RootServiceConfiguration, string>(); + var evaluatorClassHierarchy = TangFactory.GetTang().GetClassHierarchy(new string[] + { + typeof(DefaultLocalHttpDriverConnection).Assembly.GetName().Name + }); + + var evaluatorConfig = serializer.FromString(evaluatorConfigString, evaluatorClassHierarchy); + var fullEvaluatorInjector = evaluatorInjector.ForkInjector(evaluatorConfig); + + Assert.True(fullEvaluatorInjector.GetInstance<IDriverConnection>() is DefaultLocalHttpDriverConnection); + var contextClassHierarchy = TangFactory.GetTang().GetClassHierarchy(new string[] { typeof(Common.Context.ContextConfigurationOptions.ContextIdentifier).Assembly.GetName().Name }); + var contextConfig = serializer.FromString(contextConfigString, contextClassHierarchy); var taskClassHierarchy = TangFactory.GetTang().GetClassHierarchy(new string[] @@ -93,6 +106,7 @@ namespace Org.Apache.REEF.Evaluator.Tests typeof(ITask).Assembly.GetName().Name, typeof(HelloTask).Assembly.GetName().Name }); + var taskConfig = serializer.FromString(taskConfigString, taskClassHierarchy); var serviceClassHierarchy = TangFactory.GetTang().GetClassHierarchy(new string[] @@ -102,7 +116,7 @@ namespace Org.Apache.REEF.Evaluator.Tests }); var serviceConfig = serializer.FromString(serviceConfigString, serviceClassHierarchy); - var contextInjector = evaluatorInjector.ForkInjector(contextConfig); + var contextInjector = fullEvaluatorInjector.ForkInjector(contextConfig); string contextId = contextInjector.GetNamedInstance<Common.Context.ContextConfigurationOptions.ContextIdentifier, string>(); Assert.True(contextId.StartsWith(ContextIdPrefix)); @@ -151,6 +165,15 @@ namespace Org.Apache.REEF.Evaluator.Tests new ConfigurationEntry("org.apache.reef.runtime.common.evaluator.parameters.EvaluatorIdentifier", "Node-2-1447450298921")); + var evaluatorConfiguration = TangFactory.GetTang().NewConfigurationBuilder() + .BindImplementation(GenericType<IDriverConnection>.Class, GenericType<DefaultLocalHttpDriverConnection>.Class) + .Build(); + + var evaluatorString = serializer.ToString(evaluatorConfiguration); + configurationEntries.Add( + new ConfigurationEntry("org.apache.reef.runtime.common.evaluator.parameters.EvaluatorConfiguration", + evaluatorString)); + var taskConfiguration = TaskConfiguration.ConfigurationModule .Set(TaskConfiguration.Identifier, "HelloTask") .Set(TaskConfiguration.Task, GenericType<HelloTask>.Class) http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs index 225e13d..82b12f3 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs @@ -80,8 +80,14 @@ namespace Org.Apache.REEF.Evaluator } AppDomain.CurrentDomain.UnhandledException += UnhandledExceptionHandler; - Evaluator evaluator = TangFactory.GetTang() - .NewInjector(ReadClrBridgeConfiguration(), ReadEvaluatorConfiguration(args[0])) + var fullEvaluatorConfiguration = ReadEvaluatorConfiguration(args[0]); + var injector = TangFactory.GetTang().NewInjector(fullEvaluatorConfiguration); + var serializer = injector.GetInstance<AvroConfigurationSerializer>(); + var rootEvaluatorConfiguration = + serializer.FromString(injector.GetNamedInstance<EvaluatorConfiguration, string>()); + var evaluator = injector.ForkInjector( + ReadClrBridgeConfiguration(), + rootEvaluatorConfiguration) .GetInstance<Evaluator>(); evaluator.Run(); http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/App.config ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/App.config b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/App.config new file mode 100644 index 0000000..72dbca3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/App.config @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="utf-8" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<configuration> + <startup> + <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" /> + </startup> + <runtime> + <assemblyBinding xmlns="urn:schemas-microsoft-com:asm.v1"> + <dependentAssembly> + <assemblyIdentity name="Newtonsoft.Json" publicKeyToken="30ad4fe6b2a6aeed" culture="neutral" /> + <bindingRedirect oldVersion="0.0.0.0-7.0.0.0" newVersion="7.0.0.0" /> + </dependentAssembly> + </assemblyBinding> + </runtime> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj index 02fb9d6..a41364d 100644 --- a/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj +++ b/lang/cs/Org.Apache.REEF.Examples.DriverRestart/Org.Apache.REEF.Examples.DriverRestart.csproj @@ -10,11 +10,11 @@ <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> <FileAlignment>512</FileAlignment> <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..</SolutionDir> - </PropertyGroup> + </PropertyGroup> <Import Project="$(SolutionDir)\build.props" /> <PropertyGroup> <RestorePackages>true</RestorePackages> - <BuildPackage>false</BuildPackage> + <BuildPackage>false</BuildPackage> <UseVSHostingProcess>false</UseVSHostingProcess> </PropertyGroup> <ItemGroup> @@ -31,6 +31,7 @@ <Compile Include="Properties\AssemblyInfo.cs" /> </ItemGroup> <ItemGroup> + <None Include="App.config" /> <None Include="packages.config" /> <None Include="Readme.md" /> </ItemGroup> @@ -81,4 +82,4 @@ </PropertyGroup> <Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" /> </Target> -</Project> +</Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java index bdf87f4..9f2a57a 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java @@ -54,11 +54,16 @@ public final class AllocatedEvaluatorBridge extends NativeBridge implements Iden /** * Bridge function for REEF .NET to submit context and task configurations for the allocated evaluator. + * @param evaluatorConfigurationString the evaluator configuration from .NET. * @param contextConfigurationString the context configuration from .NET. * @param taskConfigurationString the task configuration from .NET. */ - public void submitContextAndTaskString(final String contextConfigurationString, + public void submitContextAndTaskString(final String evaluatorConfigurationString, + final String contextConfigurationString, final String taskConfigurationString) { + if (evaluatorConfigurationString.isEmpty()) { + throw new RuntimeException("empty evaluatorConfigurationString provided."); + } if (contextConfigurationString.isEmpty()) { throw new RuntimeException("empty contextConfigurationString provided."); } @@ -69,14 +74,19 @@ public final class AllocatedEvaluatorBridge extends NativeBridge implements Iden //submitContextAndTask(final String contextConfiguration, //final String taskConfiguration) is not exposed in the interface. Therefore cast is necessary. ((AllocatedEvaluatorImpl)jallocatedEvaluator) - .submitContextAndTask(contextConfigurationString, taskConfigurationString); + .submitContextAndTask(evaluatorConfigurationString, contextConfigurationString, taskConfigurationString); } /** * Bridge function for REEF .NET to submit context configuration for the allocated evaluator. + * @param evaluatorConfigurationString the evaluator configuration from .NET. * @param contextConfigurationString the context configuration from .NET. */ - public void submitContextString(final String contextConfigurationString) { + public void submitContextString(final String evaluatorConfigurationString, + final String contextConfigurationString) { + if (evaluatorConfigurationString.isEmpty()) { + throw new RuntimeException("empty evaluatorConfigurationString provided."); + } if (contextConfigurationString.isEmpty()) { throw new RuntimeException("empty contextConfigurationString provided."); } @@ -84,16 +94,22 @@ public final class AllocatedEvaluatorBridge extends NativeBridge implements Iden //When submit over the bridge, we would keep the contextConfigurationString as serialized strings. //public void submitContext(final String contextConfiguration) // is not exposed in the interface. Therefore cast is necessary. - ((AllocatedEvaluatorImpl)jallocatedEvaluator).submitContext(contextConfigurationString); + ((AllocatedEvaluatorImpl)jallocatedEvaluator).submitContext( + evaluatorConfigurationString, contextConfigurationString); } /** * Bridge function for REEF .NET to submit context and service configurations for the allocated evaluator. + * @param evaluatorConfigurationString the evaluator configuration from .NET. * @param contextConfigurationString the context configuration from .NET. * @param serviceConfigurationString the service configuration from .NET. */ - public void submitContextAndServiceString(final String contextConfigurationString, + public void submitContextAndServiceString(final String evaluatorConfigurationString, + final String contextConfigurationString, final String serviceConfigurationString) { + if (evaluatorConfigurationString.isEmpty()) { + throw new RuntimeException("empty evaluatorConfigurationString provided."); + } if (contextConfigurationString.isEmpty()) { throw new RuntimeException("empty contextConfigurationString provided."); } @@ -105,19 +121,24 @@ public final class AllocatedEvaluatorBridge extends NativeBridge implements Iden //public void submitContextAndService(final String contextConfiguration, //final String serviceConfiguration) is not exposed in the interface. Therefore cast is necessary. ((AllocatedEvaluatorImpl)jallocatedEvaluator) - .submitContextAndService(contextConfigurationString, serviceConfigurationString); + .submitContextAndService(evaluatorConfigurationString, contextConfigurationString, serviceConfigurationString); } /** * Bridge function for REEF .NET to submit context, service. and task configurations for the allocated evaluator. + * @param evaluatorConfigurationString the evaluator configuration from .NET. * @param contextConfigurationString the context configuration from .NET. * @param serviceConfigurationString the service configuration from .NET. * @param taskConfigurationString the task configuration from .NET. */ public void submitContextAndServiceAndTaskString( + final String evaluatorConfigurationString, final String contextConfigurationString, final String serviceConfigurationString, final String taskConfigurationString) { + if (evaluatorConfigurationString.isEmpty()) { + throw new RuntimeException("empty evaluatorConfigurationString provided."); + } if (contextConfigurationString.isEmpty()) { throw new RuntimeException("empty contextConfigurationString provided."); } @@ -132,7 +153,7 @@ public final class AllocatedEvaluatorBridge extends NativeBridge implements Iden //submitContextAndServiceAndTask(final Configuration contextConfiguration, final Configuration serviceConfiguration, //final String taskConfiguration) is not exposed in the interface. Therefore cast is necessary. ((AllocatedEvaluatorImpl)jallocatedEvaluator).submitContextAndServiceAndTask( - contextConfigurationString, serviceConfigurationString, taskConfigurationString); + evaluatorConfigurationString, contextConfigurationString, serviceConfigurationString, taskConfigurationString); } /** http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java index e57d7a8..c7fab90 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java @@ -103,15 +103,16 @@ public final class AllocatedEvaluatorImpl implements AllocatedEvaluator { * Submit Task with configuration strings. * This method should be called from bridge and the configuration strings are * serialized at .Net side. + * @param evaluatorConfiguration * @param taskConfiguration */ - public void submitTask(final String taskConfiguration) { + public void submitTask(final String evaluatorConfiguration, final String taskConfiguration) { final Configuration contextConfiguration = ContextConfiguration.CONF .set(ContextConfiguration.IDENTIFIER, "RootContext_" + this.getId()) .build(); final String contextConfigurationString = this.configurationSerializer.toString(contextConfiguration); this.launchWithConfigurationString( - contextConfigurationString, Optional.<String>empty(), Optional.of(taskConfiguration)); + evaluatorConfiguration, contextConfigurationString, Optional.<String>empty(), Optional.of(taskConfiguration)); } @Override @@ -128,10 +129,12 @@ public final class AllocatedEvaluatorImpl implements AllocatedEvaluator { * Submit Context with configuration strings. * This method should be called from bridge and the configuration strings are * serialized at .Net side. + * @param evaluatorConfiguration * @param contextConfiguration */ - public void submitContext(final String contextConfiguration) { - launchWithConfigurationString(contextConfiguration, Optional.<String>empty(), Optional.<String>empty()); + public void submitContext(final String evaluatorConfiguration, final String contextConfiguration) { + launchWithConfigurationString(evaluatorConfiguration, contextConfiguration, Optional.<String>empty(), + Optional.<String>empty()); } @Override @@ -144,12 +147,15 @@ public final class AllocatedEvaluatorImpl implements AllocatedEvaluator { * Submit Context and Service with configuration strings. * This method should be called from bridge and the configuration strings are * serialized at .Net side. + * @param evaluatorConfiguration * @param contextConfiguration * @param serviceConfiguration */ - public void submitContextAndService(final String contextConfiguration, + public void submitContextAndService(final String evaluatorConfiguration, + final String contextConfiguration, final String serviceConfiguration) { - launchWithConfigurationString(contextConfiguration, Optional.of(serviceConfiguration), Optional.<String>empty()); + launchWithConfigurationString(evaluatorConfiguration, contextConfiguration, + Optional.of(serviceConfiguration), Optional.<String>empty()); } @Override @@ -162,12 +168,15 @@ public final class AllocatedEvaluatorImpl implements AllocatedEvaluator { * Submit Context and Task with configuration strings. * This method should be called from bridge and the configuration strings are * serialized at .Net side. + * @param evaluatorConfiguration * @param contextConfiguration * @param taskConfiguration */ - public void submitContextAndTask(final String contextConfiguration, + public void submitContextAndTask(final String evaluatorConfiguration, + final String contextConfiguration, final String taskConfiguration) { - this.launchWithConfigurationString(contextConfiguration, Optional.<String>empty(), Optional.of(taskConfiguration)); + this.launchWithConfigurationString(evaluatorConfiguration, contextConfiguration, + Optional.<String>empty(), Optional.of(taskConfiguration)); } @Override @@ -181,15 +190,17 @@ public final class AllocatedEvaluatorImpl implements AllocatedEvaluator { * Submit Context and Service with configuration strings. * This method should be called from bridge and the configuration strings are * serialized at .Net side + * @param evaluatorConfiguration * @param contextConfiguration * @param serviceConfiguration * @param taskConfiguration */ - public void submitContextAndServiceAndTask(final String contextConfiguration, + public void submitContextAndServiceAndTask(final String evaluatorConfiguration, + final String contextConfiguration, final String serviceConfiguration, final String taskConfiguration) { - launchWithConfigurationString( - contextConfiguration, Optional.of(serviceConfiguration), Optional.of(taskConfiguration)); + launchWithConfigurationString(evaluatorConfiguration, contextConfiguration, + Optional.of(serviceConfiguration), Optional.of(taskConfiguration)); } @Override @@ -223,17 +234,21 @@ public final class AllocatedEvaluatorImpl implements AllocatedEvaluator { * This method should be called from bridge and the configuration strings are * serialized at .Net side * @param contextConfiguration + * @param evaluatorConfiguration * @param serviceConfiguration * @param taskConfiguration */ - private void launchWithConfigurationString(final String contextConfiguration, - final Optional<String> serviceConfiguration, - final Optional<String> taskConfiguration) { + private void launchWithConfigurationString( + final String evaluatorConfiguration, + final String contextConfiguration, + final Optional<String> serviceConfiguration, + final Optional<String> taskConfiguration) { try (final LoggingScope lb = loggingScopeFactory.evaluatorLaunch(this.getId())) { - final Configuration evaluatorConfiguration = - makeEvaluatorConfiguration(contextConfiguration, serviceConfiguration, taskConfiguration); + final Configuration submissionEvaluatorConfiguration = + makeEvaluatorConfiguration( + contextConfiguration, Optional.of(evaluatorConfiguration), serviceConfiguration, taskConfiguration); - resourceBuildAndLaunch(evaluatorConfiguration); + resourceBuildAndLaunch(submissionEvaluatorConfiguration); } } @@ -268,27 +283,30 @@ public final class AllocatedEvaluatorImpl implements AllocatedEvaluator { if (taskConfiguration.isPresent()) { taskConfigurationString = Optional.of(this.configurationSerializer.toString(taskConfiguration.get())); } else { - taskConfigurationString = Optional.<String>empty(); + taskConfigurationString = Optional.empty(); } final Optional<Configuration> mergedServiceConfiguration = makeRootServiceConfiguration(serviceConfiguration); if (mergedServiceConfiguration.isPresent()) { final String serviceConfigurationString = this.configurationSerializer.toString(mergedServiceConfiguration.get()); - return makeEvaluatorConfiguration( - contextConfigurationString, Optional.of(serviceConfigurationString), taskConfigurationString); + return makeEvaluatorConfiguration(contextConfigurationString, Optional.<String>empty(), + Optional.of(serviceConfigurationString), taskConfigurationString); } else { - return makeEvaluatorConfiguration(contextConfigurationString, Optional.<String>empty(), taskConfigurationString); + return makeEvaluatorConfiguration( + contextConfigurationString, Optional.<String>empty(), Optional.<String>empty(), taskConfigurationString); } } /** * Make configuration for Evaluator. * @param contextConfiguration + * @param evaluatorConfiguration * @param serviceConfiguration * @param taskConfiguration * @return Configuration */ private Configuration makeEvaluatorConfiguration(final String contextConfiguration, + final Optional<String> evaluatorConfiguration, final Optional<String> serviceConfiguration, final Optional<String> taskConfiguration) { @@ -305,9 +323,19 @@ public final class AllocatedEvaluatorImpl implements AllocatedEvaluator { .set(EvaluatorConfiguration.ROOT_CONTEXT_CONFIGURATION, contextConfiguration); // Add the (optional) service configuration + if (evaluatorConfiguration.isPresent()) { + evaluatorConfigurationModule = evaluatorConfigurationModule + .set(EvaluatorConfiguration.EVALUATOR_CONFIGURATION, evaluatorConfiguration.get()); + } + + // Add the (optional) service configuration if (serviceConfiguration.isPresent()) { evaluatorConfigurationModule = evaluatorConfigurationModule .set(EvaluatorConfiguration.ROOT_SERVICE_CONFIGURATION, serviceConfiguration.get()); + } else { + evaluatorConfigurationModule = evaluatorConfigurationModule + .set(EvaluatorConfiguration.ROOT_SERVICE_CONFIGURATION, + this.configurationSerializer.toString(Tang.Factory.getTang().newConfigurationBuilder().build())); } // Add the (optional) task configuration http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorConfiguration.java index 1316e33..978537d 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorConfiguration.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/EvaluatorConfiguration.java @@ -41,6 +41,7 @@ public final class EvaluatorConfiguration extends ConfigurationModuleBuilder { public static final RequiredParameter<String> DRIVER_REMOTE_IDENTIFIER = new RequiredParameter<>(); public static final RequiredParameter<String> EVALUATOR_IDENTIFIER = new RequiredParameter<>(); public static final RequiredParameter<String> ROOT_CONTEXT_CONFIGURATION = new RequiredParameter<>(); + public static final OptionalParameter<String> EVALUATOR_CONFIGURATION = new OptionalParameter<>(); public static final OptionalParameter<String> ROOT_SERVICE_CONFIGURATION = new OptionalParameter<>(); public static final OptionalParameter<String> TASK_CONFIGURATION = new OptionalParameter<>(); public static final OptionalParameter<Integer> HEARTBEAT_PERIOD = new OptionalParameter<>(); @@ -54,6 +55,8 @@ public final class EvaluatorConfiguration extends ConfigurationModuleBuilder { .bindNamedParameter(ErrorHandlerRID.class, DRIVER_REMOTE_IDENTIFIER) .bindNamedParameter(EvaluatorIdentifier.class, EVALUATOR_IDENTIFIER) .bindNamedParameter(HeartbeatPeriod.class, HEARTBEAT_PERIOD) + .bindNamedParameter(org.apache.reef.runtime.common.evaluator.parameters.EvaluatorConfiguration.class, + EVALUATOR_CONFIGURATION) .bindNamedParameter(RootContextConfiguration.class, ROOT_CONTEXT_CONFIGURATION) .bindNamedParameter(InitialTaskConfiguration.class, TASK_CONFIGURATION) .bindNamedParameter(RootServiceConfiguration.class, ROOT_SERVICE_CONFIGURATION) http://git-wip-us.apache.org/repos/asf/reef/blob/4ddbc29b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/EvaluatorConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/EvaluatorConfiguration.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/EvaluatorConfiguration.java new file mode 100644 index 0000000..95ac94a --- /dev/null +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/evaluator/parameters/EvaluatorConfiguration.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.common.evaluator.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * The evaluator configuration. + * An alias has been defined for this named parameter in .Net REEF as + * Org.Apache.REEF.Common.Runtime.Evaluator.Utils.EvaluatorConfiguration + */ +@NamedParameter(doc = "The evaluator configuration.") +public final class EvaluatorConfiguration implements Name<String> { + private EvaluatorConfiguration() { + } +}
