Repository: reef Updated Branches: refs/heads/master c50f45bcf -> 1cd8a09d3
[REEF-1062] Update TaskConfiguration to properly use Tang Injection This addressed the issue by * Added tests for TaskRuntime. * Added interface for HeartbeatManager. * Switch TaskRuntime and TaskStatus to use Tang injection. * Added deprecation messages. JIRA: [REEF-1062](https://issues.apache.org/jira/browse/REEF-1062) This closes #830 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/1cd8a09d Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/1cd8a09d Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/1cd8a09d Branch: refs/heads/master Commit: 1cd8a09d3362b9e0fd084bf3abf5dcb864d1941e Parents: c50f45b Author: Andrew Chung <[email protected]> Authored: Mon Jan 25 16:36:05 2016 -0800 Committer: Julia Wang <[email protected]> Committed: Thu Feb 11 11:14:21 2016 -0800 ---------------------------------------------------------------------- .../Org.Apache.REEF.Common.csproj | 2 + .../Properties/AssemblyInfo.cs | 9 +- .../Runtime/Evaluator/Context/ContextManager.cs | 13 +- .../Runtime/Evaluator/Context/ContextRuntime.cs | 42 +++- .../Evaluator/Context/RootContextLauncher.cs | 9 +- .../Runtime/Evaluator/EvaluatorRuntime.cs | 4 +- .../Runtime/Evaluator/EvaluatorSettings.cs | 36 ++- .../Runtime/Evaluator/HeartBeatManager.cs | 7 +- .../Runtime/Evaluator/IHeartBeatManager.cs | 69 ++++++ .../Runtime/Evaluator/Task/TaskRuntime.cs | 106 ++++----- .../Runtime/Evaluator/Task/TaskStatus.cs | 25 +- .../DefaultDriverConnectionMessageHandler.cs | 48 ++++ .../Defaults/DefaultDriverMessageHandler.cs | 6 +- .../Tasks/IDriverConnectionMessageHandler.cs | 3 + .../Tasks/IDriverMessageHandler.cs | 4 +- .../Tasks/TaskConfiguration.cs | 46 +--- .../EvaluatorTests.cs | 14 +- .../Org.Apache.REEF.Evaluator.Tests.csproj | 9 + .../TaskRuntimeTests.cs | 232 +++++++++++++++++++ .../packages.config | 4 +- lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs | 1 + 21 files changed, 545 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index 65c85e3..35a4fee 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -141,6 +141,7 @@ under the License. <Compile Include="Runtime\Evaluator\EvaluatorRuntime.cs" /> <Compile Include="Runtime\Evaluator\EvaluatorSettings.cs" /> <Compile Include="Runtime\Evaluator\HeartBeatManager.cs" /> + <Compile Include="Runtime\Evaluator\IHeartBeatManager.cs" /> <Compile Include="Runtime\Evaluator\Parameters\EvaluatorHeartbeatPeriodInMs.cs" /> <Compile Include="Runtime\Evaluator\Parameters\HeartbeatMaxRetry.cs" /> <Compile Include="Runtime\Evaluator\ReefMessageProtoObserver.cs" /> @@ -159,6 +160,7 @@ under the License. <Compile Include="Services\IService.cs" /> <Compile Include="Services\ServiceConfiguration.cs" /> <Compile Include="Services\ServicesConfigurationOptions.cs" /> + <Compile Include="Tasks\Defaults\DefaultDriverConnectionMessageHandler.cs" /> <Compile Include="Tasks\Defaults\DefaultDriverMessageHandler.cs" /> <Compile Include="Tasks\Defaults\DefaultTaskMessageSource.cs" /> <Compile Include="Tasks\DriverConnectionState.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs b/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs index fbd2685..cbe3f43 100644 --- a/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs +++ b/lang/cs/Org.Apache.REEF.Common/Properties/AssemblyInfo.cs @@ -83,4 +83,11 @@ using System.Runtime.InteropServices; "00240000048000009400000006020000002400005253413100040000010001005df3e621d886a9" + "9c03469d0f93a9f5d45aa2c883f50cd158759e93673f759ec4657fd84cc79d2db38ef1a2d914cc" + "b7c717846a897e11dd22eb260a7ce2da2dccf0263ea63e2b3f7dac24f28882aa568ef544341d17" + - "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")] \ No newline at end of file + "618392a1095f4049ad079d4f4f0b429bb535699155fd6a7652ec7d6c1f1ba2b560f11ef3a86b5945d288cf")] + +// Allow NSubstitute to create proxy implementations +[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2, PublicKey=002400000480000" + + "0940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a36" + + "02f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0ba" + + "c1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924c" + + "ceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7")] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs index 143c81e..5f9b779 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs @@ -35,7 +35,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context internal sealed class ContextManager : IDisposable { private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextManager)); - private readonly HeartBeatManager _heartBeatManager; + private readonly IHeartBeatManager _heartBeatManager; private readonly RootContextLauncher _rootContextLauncher; private readonly object _contextLock = new object(); private readonly AvroConfigurationSerializer _serializer; @@ -43,10 +43,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context [Inject] private ContextManager( - HeartBeatManager heartBeatManager, + IHeartBeatManager heartBeatManager, EvaluatorSettings evaluatorSettings, AvroConfigurationSerializer serializer) { + // TODO[JIRA REEF-217]: Inject base Injector and pass Injector to RootContextLauncher using (LOGGER.LogFunction("ContextManager::ContextManager")) { _heartBeatManager = heartBeatManager; @@ -56,7 +57,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context evaluatorSettings.RootContextId, evaluatorSettings.RootContextConfig, evaluatorSettings.RootServiceConfiguration, - evaluatorSettings.RootTaskConfiguration); + evaluatorSettings.RootTaskConfiguration, + heartBeatManager); } } @@ -346,8 +348,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context string.Format(CultureInfo.InvariantCulture, "Task expected context '{0}' but the active context has Id '{1}'", expectedContextId, currentActiveContext.Id)); Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); } - TaskConfiguration taskConfiguration = new TaskConfiguration(startTaskProto.configuration); - currentActiveContext.StartTask(taskConfiguration, expectedContextId, _heartBeatManager); + + var configuration = _serializer.FromString(startTaskProto.configuration); + currentActiveContext.StartTask(configuration, expectedContextId, _heartBeatManager); } } http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs index 21aac8c..0558c45 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs @@ -44,6 +44,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context // The parent context, if any. private readonly Optional<ContextRuntime> _parentContext; + // Flag indicating whether the ContextRuntime was constructed with the deprecated Constructor or not.] + // TODO[JIRA REEF-1167]: Remove variable. + private readonly bool _deprecatedTaskStart; + // The child context, if any. private Optional<ContextRuntime> _childContext = Optional<ContextRuntime>.Empty(); @@ -67,6 +71,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context _contextInjector = serviceInjector.ForkInjector(contextConfiguration); _contextLifeCycle = _contextInjector.GetInstance<ContextLifeCycle>(); _parentContext = parentContext; + _deprecatedTaskStart = false; _contextLifeCycle.Start(); } @@ -104,6 +109,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context Utilities.Diagnostics.Exceptions.Throw(ex, LOGGER); } + _deprecatedTaskStart = true; + // Trigger the context start events on contextInjector. _contextLifeCycle.Start(); } @@ -199,7 +206,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// <param name="taskConfiguration"></param> /// <param name="contextId"></param> /// <param name="heartBeatManager"></param> - public void StartTask(TaskConfiguration taskConfiguration, string contextId, HeartBeatManager heartBeatManager) + public void StartTask(IConfiguration taskConfiguration, string contextId, IHeartBeatManager heartBeatManager) { lock (_contextLifeCycle) { @@ -229,22 +236,45 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context var e = new InvalidOperationException("Attempting to instantiate a child context on a context that is not the topmost active context."); Utilities.Diagnostics.Exceptions.Throw(e, LOGGER); } + + var taskInjector = _contextInjector.ForkInjector(taskConfiguration); + + var taskRuntime = _deprecatedTaskStart + ? GetDeprecatedTaskRuntime(taskInjector, contextId, taskConfiguration, heartBeatManager) + : taskInjector.GetInstance<TaskRuntime>(); + try { - IInjector taskInjector = _contextInjector.ForkInjector(taskConfiguration.TangConfig); - LOGGER.Log(Level.Info, "Trying to inject task with configuration" + taskConfiguration.ToString()); - TaskRuntime taskRuntime = new TaskRuntime(taskInjector, contextId, taskConfiguration.TaskId, heartBeatManager); - taskRuntime.RunTask(); _task = Optional<TaskRuntime>.Of(taskRuntime); + taskRuntime.RunTask(); } catch (Exception e) { - var ex = new TaskClientCodeException(taskConfiguration.TaskId, Id, "Unable to instantiate the new task", e); + var ex = new TaskClientCodeException(string.Empty, Id, "Unable to run the new task", e); Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Task start error.", LOGGER); } } } + private TaskRuntime GetDeprecatedTaskRuntime( + IInjector taskInjector, string contextId, IConfiguration taskConfiguration, IHeartBeatManager heartBeatManager) + { + var taskId = string.Empty; + try + { + taskId = taskInjector.GetNamedInstance<TaskConfigurationOptions.Identifier, string>(); + } + catch (Exception e) + { + var ex = new TaskClientCodeException(string.Empty, Id, "Unable to instantiate the new task", e); + Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Cannot get instance of Task ID: " + e.StackTrace, LOGGER); + } + + LOGGER.Log(Level.Info, "Trying to inject task with configuration" + taskConfiguration); + + return new TaskRuntime(taskInjector, contextId, taskId, heartBeatManager); + } + /// <summary> /// Close this context. If there is a child context, this recursively closes it before closing this context. If /// there is an Task currently running, that will be closed. http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs index 775da51..f78e1b7 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/RootContextLauncher.cs @@ -21,6 +21,7 @@ using Org.Apache.REEF.Common.Services; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Logging; @@ -40,15 +41,16 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context private ContextRuntime _rootContext = null; public RootContextLauncher(string id, IConfiguration contextConfiguration, - Optional<ServiceConfiguration> rootServiceConfig, Optional<TaskConfiguration> rootTaskConfig) + Optional<ServiceConfiguration> rootServiceConfig, Optional<IConfiguration> rootTaskConfig, IHeartBeatManager heartbeatManager) { Id = id; _rootContextConfiguration = contextConfiguration; _rootServiceInjector = InjectServices(rootServiceConfig); + _rootServiceInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, heartbeatManager); RootTaskConfig = rootTaskConfig; } - public Optional<TaskConfiguration> RootTaskConfig { get; set; } + public Optional<IConfiguration> RootTaskConfig { get; set; } public string Id { get; private set; } @@ -73,6 +75,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context private static IInjector InjectServices(Optional<ServiceConfiguration> serviceConfig) { + // TODO[JIRA REEF-217]: Use base injector for the Evaluator here instead. IInjector rootServiceInjector; if (serviceConfig.IsPresent()) @@ -100,4 +103,4 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context return rootServiceInjector; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs index 4f6359b..be26699 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs @@ -33,7 +33,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator private readonly string _evaluatorId; private readonly ContextManager _contextManager; - private readonly HeartBeatManager _heartBeatManager; + private readonly IHeartBeatManager _heartBeatManager; private readonly IClock _clock; private readonly IDisposable _evaluatorControlChannel; @@ -42,7 +42,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator [Inject] public EvaluatorRuntime( ContextManager contextManager, - HeartBeatManager heartBeatManager) + IHeartBeatManager heartBeatManager) { using (Logger.LogFunction("EvaluatorRuntime::EvaluatorRuntime")) { http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs index fe331da..c9b2a35 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs @@ -20,11 +20,9 @@ using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Common.Evaluator; using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; -using Org.Apache.REEF.Common.Runtime.Evaluator.Context; using Org.Apache.REEF.Common.Runtime.Evaluator.Parameters; using Org.Apache.REEF.Common.Runtime.Evaluator.Utils; using Org.Apache.REEF.Common.Services; -using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Formats; @@ -51,7 +49,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator private readonly IInjector _injector; private readonly IConfiguration _rootContextConfig; private readonly AvroConfigurationSerializer _serializer; - private readonly Optional<TaskConfiguration> _rootTaskConfiguration; + private readonly Optional<IConfiguration> _rootTaskConfiguration; private readonly Optional<ServiceConfiguration> _rootServiceConfiguration; private EvaluatorOperationState _operationState; @@ -81,7 +79,25 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator RuntimeClock clock, IRemoteManagerFactory remoteManagerFactory, REEFMessageCodec reefMessageCodec, - IInjector injector) + IInjector injector) : + this(applicationId, evaluatorId, heartbeatPeriodInMs, maxHeartbeatRetries, rootContextConfigString, serializer, + clock, remoteManagerFactory, reefMessageCodec, injector, null) + { + } + + [Inject] + private EvaluatorSettings( + [Parameter(typeof(ApplicationIdentifier))] string applicationId, + [Parameter(typeof(EvaluatorIdentifier))] string evaluatorId, + [Parameter(typeof(EvaluatorHeartbeatPeriodInMs))] int heartbeatPeriodInMs, + [Parameter(typeof(HeartbeatMaxRetry))] int maxHeartbeatRetries, + [Parameter(typeof(RootContextConfiguration))] string rootContextConfigString, + AvroConfigurationSerializer serializer, + RuntimeClock clock, + IRemoteManagerFactory remoteManagerFactory, + REEFMessageCodec reefMessageCodec, + IInjector injector, + INameClient nameClient) { _serializer = serializer; _injector = injector; @@ -105,7 +121,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator catch (InjectionException) { Logger.Log(Level.Info, "Using deprecated ContextConfiguration."); - + // TODO[JIRA REEF-1167]: Remove this catch. var deprecatedContextConfig = new Context.ContextConfiguration(rootContextConfigString); _rootContextConfig = deprecatedContextConfig; @@ -117,6 +133,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator _remoteManager = remoteManagerFactory.GetInstance(reefMessageCodec); _operationState = EvaluatorOperationState.OPERATIONAL; + _nameClient = nameClient; } /// <summary> @@ -198,7 +215,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator /// <summary> /// return Root Task Configuration passed from Evaluator configuration /// </summary> - public Optional<TaskConfiguration> RootTaskConfiguration + public Optional<IConfiguration> RootTaskConfiguration { get { return _rootTaskConfiguration; } } @@ -260,7 +277,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } } - private Optional<TaskConfiguration> CreateTaskConfiguration() + private Optional<IConfiguration> CreateTaskConfiguration() { string taskConfigString = null; try @@ -272,9 +289,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator Logger.Log(Level.Info, "InitialTaskConfiguration is not set in Evaluator.config."); } return string.IsNullOrEmpty(taskConfigString) - ? Optional<TaskConfiguration>.Empty() - : Optional<TaskConfiguration>.Of( - new TaskConfiguration(taskConfigString)); + ? Optional<IConfiguration>.Empty() + : Optional<IConfiguration>.Of(_serializer.FromString(taskConfigString)); } private Optional<ServiceConfiguration> CreateRootServiceConfiguration() http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs index a121e94..3ff7167 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs @@ -17,7 +17,6 @@ using System; using System.Collections.Generic; -using System.Diagnostics.CodeAnalysis; using System.Globalization; using System.Linq; using System.Net; @@ -38,7 +37,7 @@ using Org.Apache.REEF.Wake.Time.Event; namespace Org.Apache.REEF.Common.Runtime.Evaluator { - internal sealed class HeartBeatManager : IObserver<Alarm> + internal sealed class HeartBeatManager : IHeartBeatManager { private static readonly Logger LOGGER = Logger.GetLogger(typeof(HeartBeatManager)); @@ -112,7 +111,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator /// <summary> /// EvaluatorSettings contains the configuration data of the evaluators /// </summary> - internal EvaluatorSettings EvaluatorSettings + public EvaluatorSettings EvaluatorSettings { get { return _evaluatorSettings; } } @@ -159,7 +158,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } catch (Exception ex) { - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Failed to inject the driver reconnect implementation", LOGGER); + Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Failed to inject the driver reconnect implementation", LOGGER); } LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection); _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY; http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/IHeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/IHeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/IHeartBeatManager.cs new file mode 100644 index 0000000..0b2b2f5 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/IHeartBeatManager.cs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Protobuf.ReefProtocol; +using Org.Apache.REEF.Common.Runtime.Evaluator.Context; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Time.Event; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator +{ + [DefaultImplementation(typeof(HeartBeatManager))] + internal interface IHeartBeatManager : IObserver<Alarm> + { + /// <summary> + /// Return EvaluatorRuntime referenced from HeartBeatManager + /// </summary> + EvaluatorRuntime EvaluatorRuntime { get; } + + /// <summary> + /// Return ContextManager referenced from HeartBeatManager + /// </summary> + ContextManager ContextManager { get; } + + /// <summary> + /// EvaluatorSettings contains the configuration data of the evaluators + /// </summary> + EvaluatorSettings EvaluatorSettings { get; } + + void Send(EvaluatorHeartbeatProto evaluatorHeartbeatProto); + + /// <summary> + /// Assemble a complete new heartbeat and send it out. + /// </summary> + void OnNext(); + + /// <summary> + /// Called with a specific TaskStatus that must be delivered to the driver + /// </summary> + /// <param name="taskStatusProto"></param> + void OnNext(TaskStatusProto taskStatusProto); + + /// <summary> + /// Called with a specific ContextStatusProto that must be delivered to the driver + /// </summary> + /// <param name="contextStatusProto"></param> + void OnNext(ContextStatusProto contextStatusProto); + + /// <summary> + /// Called with a specific EvaluatorStatus that must be delivered to the driver + /// </summary> + /// <param name="evaluatorStatusProto"></param> + void OnNext(EvaluatorStatusProto evaluatorStatusProto); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 3520231..235eef7 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -19,10 +19,10 @@ using System; using System.Collections.Generic; using System.Globalization; using System.Threading; -using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities; @@ -34,20 +34,35 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { private static readonly Logger Logger = Logger.GetLogger(typeof(TaskRuntime)); - private readonly IInjector _injector; private readonly TaskStatus _currentStatus; - private readonly Lazy<IDriverConnectionMessageHandler> _driverConnectionMessageHandler; - private readonly Lazy<IDriverMessageHandler> _driverMessageHandler; - private int taskRan = 0; - - public TaskRuntime(IInjector taskInjector, string contextId, string taskId, HeartBeatManager heartBeatManager) + private readonly Optional<IDriverConnectionMessageHandler> _driverConnectionMessageHandler; + private readonly Optional<IDriverMessageHandler> _driverMessageHandler; + private readonly ITask _userTask; + private int _taskRan = 0; + + [Inject] + private TaskRuntime( + ITask userTask, + IDriverMessageHandler driverMessageHandler, + IDriverConnectionMessageHandler driverConnectionMessageHandler, + TaskStatus taskStatus) { - _injector = taskInjector; + _currentStatus = taskStatus; + _driverMessageHandler = Optional<IDriverMessageHandler>.Of(driverMessageHandler); + _driverConnectionMessageHandler = Optional<IDriverConnectionMessageHandler>.Of(driverConnectionMessageHandler); + _userTask = userTask; + } + /// <summary> + /// TODO[JIRA REEF-1167]: Remove constructor. + /// </summary> + [Obsolete("Deprecated in 0.14. Will be removed.")] + public TaskRuntime(IInjector taskInjector, string contextId, string taskId, IHeartBeatManager heartBeatManager) + { var messageSources = Optional<ISet<ITaskMessageSource>>.Empty(); try { - ITaskMessageSource taskMessageSource = _injector.GetInstance<ITaskMessageSource>(); + var taskMessageSource = taskInjector.GetInstance<ITaskMessageSource>(); messageSources = Optional<ISet<ITaskMessageSource>>.Of(new HashSet<ITaskMessageSource> { taskMessageSource }); } catch (Exception e) @@ -56,43 +71,36 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task // do not rethrow since this is benign } + try { - heartBeatManager.EvaluatorSettings.NameClient = _injector.GetInstance<INameClient>(); + _driverConnectionMessageHandler = Optional<IDriverConnectionMessageHandler>.Of(taskInjector.GetInstance<IDriverConnectionMessageHandler>()); } catch (InjectionException) { - // do not rethrow since user is not required to provide name client - Logger.Log(Level.Warning, "Cannot inject name client from task configuration."); + Logger.Log(Level.Info, "User did not implement IDriverConnectionMessageHandler."); + _driverConnectionMessageHandler = Optional<IDriverConnectionMessageHandler>.Empty(); } - _driverConnectionMessageHandler = new Lazy<IDriverConnectionMessageHandler>(() => + try { - try - { - return _injector.GetInstance<IDriverConnectionMessageHandler>(); - } - catch (InjectionException) - { - Logger.Log(Level.Info, "User did not implement IDriverConnectionMessageHandler."); - } - - return null; - }); - - _driverMessageHandler = new Lazy<IDriverMessageHandler>(() => + _driverMessageHandler = Optional<IDriverMessageHandler>.Of(taskInjector.GetInstance<IDriverMessageHandler>()); + } + catch (InjectionException) { - try - { - return _injector.GetInstance<IDriverMessageHandler>(); - } - catch (InjectionException ie) - { - Utilities.Diagnostics.Exceptions.CaughtAndThrow(ie, Level.Error, "Received Driver message, but unable to inject handler for driver message ", Logger); - } + Logger.Log(Level.Info, "User did not implement IDriverMessageHandler."); + _driverMessageHandler = Optional<IDriverMessageHandler>.Empty(); + } - return null; - }); + try + { + _userTask = taskInjector.GetInstance<ITask>(); + } + catch (InjectionException ie) + { + const string errorMessage = "User did not implement IDriverMessageHandler."; + Utilities.Diagnostics.Exceptions.CaughtAndThrow(ie, Level.Error, errorMessage, Logger); + } Logger.Log(Level.Info, "task message source injected"); _currentStatus = new TaskStatus(heartBeatManager, contextId, taskId, messageSources); @@ -113,7 +121,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task /// </summary> public void RunTask() { - if (Interlocked.Exchange(ref taskRan, 1) != 0) + if (Interlocked.Exchange(ref _taskRan, 1) != 0) { // Return if we have already called RunTask throw new InvalidOperationException("TaskRun has already been called on TaskRuntime."); @@ -121,21 +129,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task // Send heartbeat such that user receives a TaskRunning message. _currentStatus.SetRunning(); - ITask userTask; - try - { - userTask = _injector.GetInstance<ITask>(); - } - catch (Exception e) - { - Utilities.Diagnostics.Exceptions.CaughtAndThrow(new InvalidOperationException("Unable to inject task.", e), Level.Error, "Unable to inject task.", Logger); - return; - } - + System.Threading.Tasks.Task.Run(() => { Logger.Log(Level.Info, "Calling into user's task."); - return userTask.Call(null); + return _userTask.Call(null); }).ContinueWith(runTask => { try @@ -167,7 +165,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } finally { - userTask.Dispose(); + if (_userTask != null) + { + _userTask.Dispose(); + } + runTask.Dispose(); } }); @@ -269,7 +271,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { Logger.Log(Level.Info, "TaskRuntime::OnNext(IDriverMessage value)"); - if (_driverMessageHandler.Value == null) + if (!_driverMessageHandler.IsPresent()) { return; } @@ -289,7 +291,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task /// </summary> internal void HandleDriverConnectionMessage(IDriverConnectionMessage message) { - if (_driverConnectionMessageHandler.Value == null) + if (!_driverConnectionMessageHandler.IsPresent()) { return; } http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs index 3c0e6b7..31ce771 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs @@ -18,8 +18,10 @@ using System; using System.Collections.Generic; using System.Globalization; +using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Logging; @@ -31,7 +33,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task private readonly object _stateLock = new object(); private readonly TaskLifeCycle _taskLifeCycle; - private readonly HeartBeatManager _heartBeatManager; + private readonly IHeartBeatManager _heartBeatManager; private readonly Optional<ISet<ITaskMessageSource>> _evaluatorMessageSources; private readonly string _taskId; private readonly string _contextId; @@ -40,7 +42,26 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task private Optional<byte[]> _result = Optional<byte[]>.Empty(); private TaskState _state; - public TaskStatus(HeartBeatManager heartBeatManager, string contextId, string taskId, Optional<ISet<ITaskMessageSource>> evaluatorMessageSources) + [Inject] + private TaskStatus( + [Parameter(typeof(ContextConfigurationOptions.ContextIdentifier))] string contextId, + [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, + [Parameter(typeof(TaskConfigurationOptions.TaskMessageSources))] ISet<ITaskMessageSource> taskMessageSources, + IHeartBeatManager heartBeatManager) + { + _heartBeatManager = heartBeatManager; + _taskLifeCycle = new TaskLifeCycle(); + _evaluatorMessageSources = Optional<ISet<ITaskMessageSource>>.OfNullable(taskMessageSources); + State = TaskState.Init; + _taskId = taskId; + _contextId = contextId; + } + + /// <summary> + /// TODO[JIRA REEF-1167]: Remove constructor. + /// </summary> + [Obsolete("Deprecated in 0.14. Will be removed.")] + public TaskStatus(IHeartBeatManager heartBeatManager, string contextId, string taskId, Optional<ISet<ITaskMessageSource>> evaluatorMessageSources) { _heartBeatManager = heartBeatManager; _taskLifeCycle = new TaskLifeCycle(); http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverConnectionMessageHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverConnectionMessageHandler.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverConnectionMessageHandler.cs new file mode 100644 index 0000000..d69740d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverConnectionMessageHandler.cs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Common.Tasks.Defaults +{ + public sealed class DefaultDriverConnectionMessageHandler : IDriverConnectionMessageHandler + { + private static readonly Logger Logger = Logger.GetLogger(typeof(DefaultDriverConnectionMessageHandler)); + + [Inject] + private DefaultDriverConnectionMessageHandler() + { + } + + public void OnNext(IDriverConnectionMessage value) + { + Logger.Log(Level.Verbose, "Driver connection state: {0}", value.State); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverMessageHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverMessageHandler.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverMessageHandler.cs index 3f9f7c0..5f45675 100644 --- a/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverMessageHandler.cs +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultDriverMessageHandler.cs @@ -24,16 +24,16 @@ namespace Org.Apache.REEF.Common.Tasks.Defaults { public class DefaultDriverMessageHandler : IDriverMessageHandler { - private static readonly Logger LOGGER = Logger.GetLogger(typeof(DefaultDriverMessageHandler)); + private static readonly Logger Logger = Logger.GetLogger(typeof(DefaultDriverMessageHandler)); [Inject] - public DefaultDriverMessageHandler() + private DefaultDriverMessageHandler() { } public void Handle(IDriverMessage message) { - Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("No DriverMessage handler bound. Message received" + message), LOGGER); + Utilities.Diagnostics.Exceptions.Throw(new InvalidOperationException("No DriverMessage handler bound. Message received" + message), Logger); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverConnectionMessageHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverConnectionMessageHandler.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverConnectionMessageHandler.cs index c9d51b9..bbc352f 100644 --- a/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverConnectionMessageHandler.cs +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverConnectionMessageHandler.cs @@ -16,6 +16,8 @@ // under the License. using System; +using Org.Apache.REEF.Common.Tasks.Defaults; +using Org.Apache.REEF.Tang.Annotations; namespace Org.Apache.REEF.Common.Tasks { @@ -23,6 +25,7 @@ namespace Org.Apache.REEF.Common.Tasks /// The handler implementable by users to handle IDriverConnectionMessages, /// which notifies the Task when there is a change in driver connection state. /// </summary> + [DefaultImplementation(typeof(DefaultDriverConnectionMessageHandler))] public interface IDriverConnectionMessageHandler : IObserver<IDriverConnectionMessage> { } http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverMessageHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverMessageHandler.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverMessageHandler.cs index 23e60b7..c7a8ffd 100644 --- a/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverMessageHandler.cs +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/IDriverMessageHandler.cs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. +using Org.Apache.REEF.Common.Tasks.Defaults; using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Tang.Annotations; namespace Org.Apache.REEF.Common.Tasks { - // [DefaultImplementation(typeof(DefaultTaskMessageSource))] + [DefaultImplementation(typeof(DefaultDriverMessageHandler))] public interface IDriverMessageHandler { void Handle(IDriverMessage message); http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs index c0ed32f..409a0de 100644 --- a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfiguration.cs @@ -16,13 +16,9 @@ // under the License. using System; -using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; -using System.Globalization; using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Formats.AvroConfigurationDataContract; -using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Utilities.Logging; @@ -30,11 +26,8 @@ using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Common.Tasks { - public class TaskConfiguration : ConfigurationModuleBuilder + public sealed class TaskConfiguration : ConfigurationModuleBuilder { - // this is a hack for getting the task identifier for now - public const string TaskIdentifier = "TaskConfigurationOptions+Identifier"; - /// <summary> /// The identifier of the task. /// </summary> @@ -95,32 +88,6 @@ namespace Org.Apache.REEF.Common.Tasks [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] public static readonly OptionalParameter<string> Memento = new OptionalParameter<string>(); - private static readonly Logger LOGGER = Logger.GetLogger(typeof(TaskConfiguration)); - - public TaskConfiguration() - : base() - { - } - - public TaskConfiguration(string configString) - { - TangConfig = new AvroConfigurationSerializer().FromString(configString); - AvroConfiguration avroConfiguration = AvroConfiguration.GetAvroConfigurationFromEmbeddedString(configString); - foreach (ConfigurationEntry config in avroConfiguration.Bindings) - { - if (config.key.Contains(TaskIdentifier)) - { - TaskId = config.value; - } - } - if (string.IsNullOrWhiteSpace(TaskId)) - { - string msg = "Required parameter TaskId not provided."; - LOGGER.Log(Level.Error, msg); - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Throw(new ArgumentException(msg), LOGGER); - } - } - public static ConfigurationModule ConfigurationModule { get @@ -139,16 +106,5 @@ namespace Org.Apache.REEF.Common.Tasks .Build(); } } - - public string TaskId { get; private set; } - - public IList<KeyValuePair<string, string>> Configurations { get; private set; } - - public IConfiguration TangConfig { get; private set; } - - public override string ToString() - { - return string.Format(CultureInfo.InvariantCulture, "TaskConfiguration - configurations: {0}", TangConfig.ToString()); - } } } http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorTests.cs index 3781699..73d4708 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorTests.cs @@ -39,7 +39,7 @@ namespace Org.Apache.REEF.Evaluator.Tests public void CanInjectAndExecuteTask() { // to enforce that shell task dll be copied to output directory. - ShellTask tmpTask = new ShellTask("invalid"); + var tmpTask = new ShellTask("invalid"); Assert.NotNull(tmpTask); string tmp = Directory.GetCurrentDirectory(); @@ -56,21 +56,17 @@ namespace Org.Apache.REEF.Evaluator.Tests .Build()); cb.BindNamedParameter<ShellTask.Command, string>(GenericType<ShellTask.Command>.Class, "dir"); - IConfiguration taskConfiguration = cb.Build(); + var config = cb.Build(); + ITask task; - string taskConfig = serializer.ToString(taskConfiguration); - - ITask task = null; - TaskConfiguration config = new TaskConfiguration(taskConfig); - Assert.NotNull(config); try { - IInjector injector = TangFactory.GetTang().NewInjector(config.TangConfig); + IInjector injector = TangFactory.GetTang().NewInjector(config); task = (ITask)injector.GetInstance(typeof(ITask)); } catch (Exception e) { - throw new InvalidOperationException("unable to inject task with configuration: " + taskConfig, e); + throw new InvalidOperationException("unable to inject task with configuration: " + config, e); } byte[] bytes = task.Call(null); http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj b/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj index da0fe33..3f908a6 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj @@ -40,6 +40,14 @@ under the License. <BuildPackage>false</BuildPackage> </PropertyGroup> <ItemGroup> + <Reference Include="NSubstitute, Version=1.8.2.0, Culture=neutral, PublicKeyToken=92dd2e9066daa5ca, processorArchitecture=MSIL"> + <HintPath>..\packages\NSubstitute.1.8.2.0\lib\net45\NSubstitute.dll</HintPath> + <Private>True</Private> + </Reference> + <Reference Include="protobuf-net, Version=2.0.0.668, Culture=neutral, PublicKeyToken=257b51d87d2e4d67, processorArchitecture=MSIL"> + <HintPath>..\packages\protobuf-net.2.0.0.668\lib\net40\protobuf-net.dll</HintPath> + <Private>True</Private> + </Reference> <Reference Include="System" /> <Reference Include="xunit.abstractions, Version=2.0.0.0, Culture=neutral, PublicKeyToken=8d05b1bb7a6fdb6c, processorArchitecture=MSIL"> <HintPath>$(PackagesDir)\xunit.abstractions.2.0.0\lib\net35\xunit.abstractions.dll</HintPath> @@ -63,6 +71,7 @@ under the License. <Compile Include="EvaluatorConfigurationsTests.cs" /> <Compile Include="EvaluatorTests.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="TaskRuntimeTests.cs" /> </ItemGroup> <ItemGroup> <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj"> http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs new file mode 100644 index 0000000..44d691f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Threading; +using NSubstitute; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Protobuf.ReefProtocol; +using Org.Apache.REEF.Common.Runtime.Evaluator; +using Org.Apache.REEF.Common.Runtime.Evaluator.Task; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Xunit; + +namespace Org.Apache.REEF.Evaluator.Tests +{ + public sealed class TaskRuntimeTests + { + [Fact] + public void TestTaskRuntimeFields() + { + var contextId = Guid.NewGuid().ToString(); + var taskId = Guid.NewGuid().ToString(); + var injector = GetInjector(contextId, taskId); + var taskRuntime = injector.GetInstance<TaskRuntime>(); + Assert.Equal(taskRuntime.TaskId, taskId); + Assert.Equal(taskRuntime.ContextId, contextId); + } + + [Fact] + public void TestTaskRuntimeInitialization() + { + var injector = GetInjector(); + var taskRuntime = injector.GetInstance<TaskRuntime>(); + Assert.Equal(taskRuntime.GetTaskState(), TaskState.Init); + Assert.False(taskRuntime.HasEnded()); + } + + [Fact] + public void TestTaskRuntimeRunsAndDisposesTask() + { + var injector = GetInjector(); + var taskRuntime = injector.GetInstance<TaskRuntime>(); + taskRuntime.RunTask(); + var task = injector.GetInstance<TestTask>(); + task.FinishCountdownEvent.Wait(); + task.DisposeCountdownEvent.Wait(); + Assert.Equal(taskRuntime.GetTaskState(), TaskState.Done); + Assert.True(taskRuntime.HasEnded()); + } + + [Fact] + public void TestTaskRuntimeFailure() + { + var injector = GetInjector(typeof(ExceptionAction)); + var taskRuntime = injector.GetInstance<TaskRuntime>(); + taskRuntime.RunTask(); + var task = injector.GetInstance<TestTask>(); + task.DisposeCountdownEvent.Wait(); + Assert.Equal(taskRuntime.GetTaskState(), TaskState.Failed); + Assert.True(taskRuntime.HasEnded()); + } + + [Fact] + public void TestTaskLifeCycle() + { + var contextId = Guid.NewGuid().ToString(); + var taskId = Guid.NewGuid().ToString(); + + var injector = GetInjector(typeof(CountDownAction), contextId, taskId); + var taskRuntime = injector.GetInstance<TaskRuntime>(); + + var statusProto = taskRuntime.GetStatusProto(); + Assert.Equal(statusProto.task_id, taskId); + Assert.Equal(statusProto.context_id, contextId); + Assert.Equal(statusProto.state, State.INIT); + Assert.Equal(taskRuntime.GetTaskState(), TaskState.Init); + + taskRuntime.RunTask(); + Assert.Equal(taskRuntime.GetStatusProto().state, State.RUNNING); + Assert.Equal(taskRuntime.GetTaskState(), TaskState.Running); + + injector.GetInstance<CountDownAction>().CountdownEvent.Signal(); + + var task = injector.GetInstance<TestTask>(); + task.FinishCountdownEvent.Wait(); + task.DisposeCountdownEvent.Wait(); + + Assert.Equal(taskRuntime.GetStatusProto().state, State.DONE); + Assert.Equal(taskRuntime.GetTaskState(), TaskState.Done); + } + + private static IInjector GetInjector(string contextId = "contextId", string taskId = "taskId") + { + return GetInjector(typeof(DefaultAction), contextId, taskId); + } + + private static IInjector GetInjector(Type actionType, string contextId = "contextId", string taskId = "taskId") + { + var confBuilder = TangFactory.GetTang().NewConfigurationBuilder(); + var heartbeatManager = Substitute.For<IHeartBeatManager>(); + + var evaluatorConfig = confBuilder + .BindNamedParameter(typeof(ContextConfigurationOptions.ContextIdentifier), contextId) + .BindNamedParameter(typeof(TaskConfigurationOptions.Identifier), taskId) + .BindImplementation(typeof(ITask), typeof(TestTask)) + .BindImplementation(typeof(IAction), actionType) + .Build(); + + var injector = TangFactory.GetTang().NewInjector(evaluatorConfig); + injector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, heartbeatManager); + + return injector; + } + + private sealed class TestTask : ITask + { + private readonly IAction _action; + + [Inject] + private TestTask(IAction action) + { + FinishCountdownEvent = new CountdownEvent(1); + DisposeCountdownEvent = new CountdownEvent(1); + _action = action; + } + + public CountdownEvent FinishCountdownEvent { get; private set; } + + public CountdownEvent DisposeCountdownEvent { get; private set; } + + public void Dispose() + { + DisposeCountdownEvent.Signal(); + } + + public byte[] Call(byte[] memento) + { + try + { + _action.Value(); + } + finally + { + FinishCountdownEvent.Signal(); + } + + return null; + } + } + + private interface IAction + { + Action Value { get; } + } + + private sealed class DefaultAction : IAction + { + [Inject] + private DefaultAction() + { + } + + public Action Value + { + get + { + // NOOP + return () => { }; + } + } + } + + private sealed class ExceptionAction : IAction + { + [Inject] + private ExceptionAction() + { + } + + public Action Value + { + get + { + return () => + { + throw new Exception(); + }; + } + } + } + + private sealed class CountDownAction : IAction + { + [Inject] + private CountDownAction() + { + CountdownEvent = new CountdownEvent(1); + } + + public Action Value + { + get + { + return () => + { + CountdownEvent.Wait(); + }; + } + } + + public CountdownEvent CountdownEvent { get; private set; } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Evaluator.Tests/packages.config ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/packages.config b/lang/cs/Org.Apache.REEF.Evaluator.Tests/packages.config index df4e5ea..d7ef120 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/packages.config +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/packages.config @@ -1,4 +1,4 @@ -<?xml version="1.0" encoding="utf-8"?> +<?xml version="1.0" encoding="utf-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -18,6 +18,8 @@ specific language governing permissions and limitations under the License. --> <packages> + <package id="NSubstitute" version="1.8.2.0" targetFramework="net45" /> + <package id="protobuf-net" version="2.0.0.668" targetFramework="net45" /> <package id="StyleCop.MSBuild" version="4.7.49.1" targetFramework="net45" developmentDependency="true" /> <package id="xunit" version="2.1.0" targetFramework="net45" /> <package id="xunit.abstractions" version="2.0.0" targetFramework="net45" /> http://git-wip-us.apache.org/repos/asf/reef/blob/1cd8a09d/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs index 6477019..07e7a53 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator/Evaluator.cs @@ -151,6 +151,7 @@ namespace Org.Apache.REEF.Evaluator } } + // TODO[JIRA REEF-217]: Remove this method. private static IConfiguration ReadEvaluatorConfiguration(string evaluatorConfigFile) { if (string.IsNullOrWhiteSpace(evaluatorConfigFile))
