Repository: reef Updated Branches: refs/heads/master 51220770e -> c371affbc
[REEF-1268] Complete ServiceConfiguration in REEF.NET This addressed the issue by * Adding OnTaskStarted, OnTaskStop, OnContextStarted, and OnContextStop to ServiceConfiguration. * Adding tests for the above functions. JIRA: [REEF-1268](https://issues.apache.org/jira/browse/REEF-1268) Pull Request: This closes #926 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/c371affb Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/c371affb Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/c371affb Branch: refs/heads/master Commit: c371affbcc6d31c7d5d8e266e81fe7462daebfcf Parents: 5122077 Author: Andrew Chung <[email protected]> Authored: Mon Apr 4 13:18:55 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Thu Apr 14 15:24:47 2016 -0700 ---------------------------------------------------------------------- .../Runtime/Evaluator/Context/ContextRuntime.cs | 111 ++++- .../Runtime/Evaluator/Task/TaskRuntime.cs | 4 +- .../Services/ServiceConfiguration.cs | 21 + .../ContextRuntimeTests.cs | 116 +---- .../EvaluatorConfigurationsTests.cs | 5 +- .../EvaluatorServiceTests.cs | 454 +++++++++++++++++++ .../Org.Apache.REEF.Evaluator.Tests.csproj | 5 + .../TestUtils/ITestService.cs | 23 + .../TestUtils/SimpleTestTask.cs | 39 ++ .../TestUtils/TestService.cs | 38 ++ .../TestUtils/TestTask.cs | 68 +++ 11 files changed, 753 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/c371affb/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs index 9901a6f..cd4b2d3 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs @@ -19,10 +19,13 @@ using System; using System.Collections.Generic; using System.Globalization; using System.Linq; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Events; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Runtime.Evaluator.Task; using Org.Apache.REEF.Common.Services; using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Attributes; @@ -34,26 +37,64 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context { private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextRuntime)); - // Context-local injector. This contains information that will not be available in child injectors. + /// <summary> + /// Context-local injector. This contains information that will not be available in child injectors. + /// </summary> private readonly IInjector _contextInjector; - // Service injector. State in this injector moves to child injectors. + /// <summary> + /// Service injector. State in this injector moves to child injectors. + /// </summary> private readonly IInjector _serviceInjector; - // Convenience class to hold all the event handlers for the context as well as the service instances. + /// <summary> + /// Convenience class to hold all the event handlers for the context as well as the service instances. + /// </summary> private readonly ContextLifeCycle _contextLifeCycle; - // The parent context, if any. + /// <summary> + /// The parent context, if any. + /// </summary> private readonly Optional<ContextRuntime> _parentContext; - private readonly Optional<ISet<object>> _injectedServices; + /// <summary> + /// The service objects bound to ServiceConfiguration. + /// </summary> + private readonly ISet<object> _injectedServices; + + /// <summary> + /// The ContextStart handlers bound to ServiceConfiguration. + /// </summary> + private readonly ISet<IObserver<IContextStart>> _serviceContextStartHandlers; - // The child context, if any. + /// <summary> + /// The ContextStop handlers bound to ServiceConfiguration. + /// </summary> + private readonly ISet<IObserver<IContextStop>> _serviceContextStopHandlers; + + /// <summary> + /// The TaskStart handlers bound to ServiceConfiguration. + /// </summary> + private readonly ISet<IObserver<ITaskStart>> _serviceTaskStartHandlers; + + /// <summary> + /// The TaskStop handlers bound to ServiceConfiguration. + /// </summary> + private readonly ISet<IObserver<ITaskStop>> _serviceTaskStopHandlers; + + /// <summary> + /// The child context, if any. + /// </summary> private Optional<ContextRuntime> _childContext = Optional<ContextRuntime>.Empty(); - // The currently running task, if any. + /// <summary> + /// The currently running task, if any. + /// </summary> private Optional<TaskRuntime> _task = Optional<TaskRuntime>.Empty(); + /// <summary> + /// Current state of the context. + /// </summary> private ContextStatusProto.State _contextState = ContextStatusProto.State.READY; /// <summary> @@ -68,7 +109,28 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context Optional<ContextRuntime> parentContext) { _serviceInjector = serviceInjector; - _injectedServices = Optional<ISet<object>>.Of(serviceInjector.GetNamedInstance<ServicesSet, ISet<object>>()); + + // Note that for Service objects and handlers, we are not merging them into a separate + // class (e.g. ServiceContainer) due to the inability to allow service stacking if an instance + // of such a class were to be materialized. i.e. if a ServiceContainer object were initialized + // and a child ServiceConfiguration is submitted, when the child service injector tries to + // get the relevant handlers and services set, it will get the same set of handlers as + // previously instantiated by the parent injector, and thus will not allow the stacking + // of ServiceConfigurations. + _injectedServices = serviceInjector.GetNamedInstance<ServicesSet, ISet<object>>(); + + _serviceContextStartHandlers = + serviceInjector.GetNamedInstance<ContextConfigurationOptions.StartHandlers, ISet<IObserver<IContextStart>>>(); + + _serviceContextStopHandlers = + serviceInjector.GetNamedInstance<ContextConfigurationOptions.StopHandlers, ISet<IObserver<IContextStop>>>(); + + _serviceTaskStartHandlers = + serviceInjector.GetNamedInstance<TaskConfigurationOptions.StartHandlers, ISet<IObserver<ITaskStart>>>(); + + _serviceTaskStopHandlers = + serviceInjector.GetNamedInstance<TaskConfigurationOptions.StopHandlers, ISet<IObserver<ITaskStop>>>(); + _contextInjector = serviceInjector.ForkInjector(contextConfiguration); _contextLifeCycle = _contextInjector.GetInstance<ContextLifeCycle>(); _parentContext = parentContext; @@ -89,12 +151,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// For testing only! /// </summary> [Testing] - internal Optional<ISet<object>> Services + internal ISet<object> Services { - get - { - return _injectedServices; - } + get { return _injectedServices; } } /// <summary> @@ -105,7 +164,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context get { return _task; } } - /// <summary> + /// <summary> /// For testing only! /// </summary> [Testing] @@ -118,6 +177,18 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context } /// <summary> + /// For testing only! + /// </summary> + [Testing] + internal IInjector ServiceInjector + { + get + { + return _serviceInjector; + } + } + + /// <summary> /// Spawns a new context. /// The new context will have a serviceInjector that is created by forking the one in this object with the given /// serviceConfiguration. The contextConfiguration is used to fork the contextInjector from that new serviceInjector. @@ -195,7 +266,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// Launches an Task on this context. /// </summary> /// <param name="taskConfiguration"></param> - public void StartTask(IConfiguration taskConfiguration) + public System.Threading.Tasks.Task StartTask(IConfiguration taskConfiguration) { lock (_contextLifeCycle) { @@ -229,12 +300,13 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context try { _task = Optional<TaskRuntime>.Of(taskRuntime); - taskRuntime.RunTask(); + return taskRuntime.RunTask(); } catch (Exception e) { var ex = new TaskClientCodeException(string.Empty, Id, "Unable to run the new task", e); Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Task start error.", LOGGER); + return null; } } } @@ -264,12 +336,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context ParentContext.Value.ResetChildContext(); } - if (_injectedServices.IsPresent()) + foreach (var injectedService in _injectedServices.OfType<IDisposable>()) { - foreach (var injectedService in _injectedServices.Value.OfType<IDisposable>()) - { - injectedService.Dispose(); - } + injectedService.Dispose(); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/c371affb/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 5cc22ec..573c0de 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -82,7 +82,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task /// <summary> /// Runs the task asynchronously. /// </summary> - public void RunTask() + public System.Threading.Tasks.Task RunTask() { if (Interlocked.Exchange(ref _taskRan, 1) != 0) { @@ -93,7 +93,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task // Send heartbeat such that user receives a TaskRunning message. _currentStatus.SetRunning(); - System.Threading.Tasks.Task.Run(() => + return System.Threading.Tasks.Task.Run(() => { Logger.Log(Level.Info, "Calling into user's task."); return _userTask.Call(null); http://git-wip-us.apache.org/repos/asf/reef/blob/c371affb/lang/cs/Org.Apache.REEF.Common/Services/ServiceConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Services/ServiceConfiguration.cs b/lang/cs/Org.Apache.REEF.Common/Services/ServiceConfiguration.cs index 2aadc97..fe6500f 100644 --- a/lang/cs/Org.Apache.REEF.Common/Services/ServiceConfiguration.cs +++ b/lang/cs/Org.Apache.REEF.Common/Services/ServiceConfiguration.cs @@ -15,8 +15,13 @@ // specific language governing permissions and limitations // under the License. +using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Events; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Util; @@ -39,12 +44,28 @@ namespace Org.Apache.REEF.Common.Services [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] public static readonly OptionalParameter<object> Services = new OptionalParameter<object>(); + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IContextStart>> OnContextStarted = new OptionalImpl<IObserver<IContextStart>>(); + + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<IContextStop>> OnContextStop = new OptionalImpl<IObserver<IContextStop>>(); + + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ITaskStart>> OnTaskStarted = new OptionalImpl<IObserver<ITaskStart>>(); + + [SuppressMessage("Microsoft.Security", "CA2104:Do not declare read only mutable reference types", Justification = "not applicable")] + public static readonly OptionalImpl<IObserver<ITaskStop>> OnTaskStop = new OptionalImpl<IObserver<ITaskStop>>(); + public static ConfigurationModule ConfigurationModule { get { return new ServiceConfiguration() .BindSetEntry(GenericType<ServicesSet>.Class, Services) + .BindSetEntry(GenericType<ContextConfigurationOptions.StartHandlers>.Class, OnContextStarted) + .BindSetEntry(GenericType<ContextConfigurationOptions.StopHandlers>.Class, OnContextStop) + .BindSetEntry(GenericType<TaskConfigurationOptions.StartHandlers>.Class, OnTaskStarted) + .BindSetEntry(GenericType<TaskConfigurationOptions.StopHandlers>.Class, OnTaskStop) .Build(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/c371affb/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs index 9a2ff96..484f5a4 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs @@ -20,6 +20,7 @@ using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; +using System.Threading.Tasks; using NSubstitute; using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Common.Events; @@ -29,6 +30,7 @@ using Org.Apache.REEF.Common.Runtime.Evaluator.Context; using Org.Apache.REEF.Common.Services; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Evaluator.Tests.TestUtils; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; @@ -90,40 +92,6 @@ namespace Org.Apache.REEF.Evaluator.Tests [Fact] [Trait("Priority", "0")] [Trait("Category", "Unit")] - public void TestServiceInstantiatedAndDisposed() - { - var serviceConfiguration = ServiceConfiguration.ConfigurationModule - .Set(ServiceConfiguration.Services, GenericType<TestService>.Class) - .Build(); - - var serviceInjector = TangFactory.GetTang().NewInjector(serviceConfiguration); - var contextConfig = GetContextEventHandlerContextConfiguration(); - - TestService testService; - using (var contextRuntime = new ContextRuntime(serviceInjector, contextConfig, Optional<ContextRuntime>.Empty())) - { - var servicesFromInjector = serviceInjector.GetNamedInstance<ServicesSet, ISet<object>>(); - testService = servicesFromInjector.Single() as TestService; - Assert.NotNull(testService); - if (testService == null) - { - // Not possible - return; - } - - var testServiceFromInjector = serviceInjector.GetInstance<TestService>(); - Assert.True(ReferenceEquals(testService, testServiceFromInjector)); - - var contextTestService = contextRuntime.ContextInjector.GetInstance<TestService>(); - Assert.True(ReferenceEquals(contextTestService, testServiceFromInjector)); - } - - Assert.True(testService.Disposed); - } - - [Fact] - [Trait("Priority", "0")] - [Trait("Category", "Unit")] public void TestBaseServiceWithContextStacking() { var serviceConfiguration = ServiceConfiguration.ConfigurationModule @@ -239,7 +207,7 @@ namespace Org.Apache.REEF.Evaluator.Tests [Fact] [Trait("Priority", "0")] [Trait("Category", "Unit")] - public void TestUnableToRunMultipleTasksAtTheSameTime() + public async Task TestUnableToRunMultipleTasksAtTheSameTime() { var serviceInjector = TangFactory.GetTang().NewInjector(); var contextConfig = GetSimpleContextConfiguration(); @@ -255,13 +223,13 @@ namespace Org.Apache.REEF.Evaluator.Tests { var hbMgr = Substitute.For<IHeartBeatManager>(); contextRuntime.ContextInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, hbMgr); - contextRuntime.StartTask(taskConfig); + var t = contextRuntime.StartTask(taskConfig); Assert.True(contextRuntime.TaskRuntime.IsPresent()); Assert.True(contextRuntime.GetTaskStatus().IsPresent()); Assert.Equal(contextRuntime.GetTaskStatus().Value.state, State.RUNNING); - Assert.Throws<InvalidOperationException>( + await Assert.ThrowsAsync<InvalidOperationException>( () => contextRuntime.StartTask(taskConfig)); } finally @@ -351,13 +319,13 @@ namespace Org.Apache.REEF.Evaluator.Tests using (var childContextRuntime = contextRuntime.SpawnChildContext(childContextConfiguration, childServiceConfiguration)) { // Check that parent service injector does not contain instances of SecondTestService - Assert.False(contextRuntime.Services.Value.OfType<SecondTestService>().Any()); - Assert.True(childContextRuntime.Services.Value.OfType<TestService>().Count() == 1); - Assert.True(childContextRuntime.Services.Value.OfType<SecondTestService>().Count() == 1); - Assert.True(childContextRuntime.Services.Value.Count() == 2); + Assert.False(contextRuntime.Services.OfType<SecondTestService>().Any()); + Assert.True(childContextRuntime.Services.OfType<TestService>().Count() == 1); + Assert.True(childContextRuntime.Services.OfType<SecondTestService>().Count() == 1); + Assert.True(childContextRuntime.Services.Count() == 2); Assert.True(ReferenceEquals( childContextRuntime.ContextInjector.GetInstance<TestService>(), - contextRuntime.Services.Value.OfType<TestService>().Single())); + contextRuntime.Services.OfType<TestService>().Single())); } } } @@ -377,27 +345,6 @@ namespace Org.Apache.REEF.Evaluator.Tests .Build(); } - private interface ITestService - { - // empty - } - - internal sealed class TestService : ITestService, IDisposable - { - [Inject] - private TestService() - { - Disposed = false; - } - - public bool Disposed { get; private set; } - - public void Dispose() - { - Disposed = true; - } - } - private sealed class SecondTestService { [Inject] @@ -445,48 +392,5 @@ namespace Org.Apache.REEF.Evaluator.Tests throw new NotImplementedException(); } } - - private sealed class TestTask : ITask, IObserver<ITaskStop> - { - [Inject] - private TestTask() - { - CountDownEvent = new CountdownEvent(1); - StopEvent = new CountdownEvent(1); - DisposedEvent = new CountdownEvent(1); - } - - public CountdownEvent CountDownEvent { get; private set; } - - public CountdownEvent StopEvent { get; private set; } - - public CountdownEvent DisposedEvent { get; private set; } - - public void Dispose() - { - DisposedEvent.Signal(); - } - - public byte[] Call(byte[] memento) - { - CountDownEvent.Wait(); - return null; - } - - public void OnNext(ITaskStop value) - { - StopEvent.Signal(); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } } } http://git-wip-us.apache.org/repos/asf/reef/blob/c371affb/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs index 7eade73..67aff01 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorConfigurationsTests.cs @@ -20,6 +20,7 @@ using Org.Apache.REEF.Common.Evaluator; using Org.Apache.REEF.Common.Runtime.Evaluator.Utils; using Org.Apache.REEF.Common.Services; using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Evaluator.Tests.TestUtils; using Org.Apache.REEF.Examples.HelloREEF; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Formats.AvroConfigurationDataContract; @@ -121,7 +122,7 @@ namespace Org.Apache.REEF.Evaluator.Tests Assert.True(contextId.StartsWith(ContextIdPrefix)); var serviceInjector = contextInjector.ForkInjector(serviceConfig); - var service = serviceInjector.GetInstance<ContextRuntimeTests.TestService>(); + var service = serviceInjector.GetInstance<TestService>(); Assert.NotNull(service); var taskInjector = serviceInjector.ForkInjector(taskConfig); @@ -189,7 +190,7 @@ namespace Org.Apache.REEF.Evaluator.Tests serializer.ToString(contextConfig))); var serviceConfiguration = ServiceConfiguration.ConfigurationModule - .Set(ServiceConfiguration.Services, GenericType<ContextRuntimeTests.TestService>.Class) + .Set(ServiceConfiguration.Services, GenericType<TestService>.Class) .Build(); configurationEntries.Add( new ConfigurationEntry("org.apache.reef.runtime.common.evaluator.parameters.RootServiceConfiguration", http://git-wip-us.apache.org/repos/asf/reef/blob/c371affb/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs new file mode 100644 index 0000000..e38b3a8 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/EvaluatorServiceTests.cs @@ -0,0 +1,454 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using NSubstitute; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Events; +using Org.Apache.REEF.Common.Runtime.Evaluator; +using Org.Apache.REEF.Common.Runtime.Evaluator.Context; +using Org.Apache.REEF.Common.Runtime.Evaluator.Utils; +using Org.Apache.REEF.Common.Services; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Evaluator.Tests.TestUtils; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities; +using Xunit; + +namespace Org.Apache.REEF.Evaluator.Tests +{ + public sealed class EvaluatorServiceTests + { + [Fact] + [Trait("Priority", "0")] + [Trait("Category", "Unit")] + public void TestServiceInstantiatedAndDisposed() + { + var serviceConfiguration = ServiceConfiguration.ConfigurationModule + .Set(ServiceConfiguration.Services, GenericType<TestService>.Class) + .Build(); + + var serviceInjector = TangFactory.GetTang().NewInjector(serviceConfiguration); + var contextConfig = GetContextConfiguration(); + + TestService testService; + using (var contextRuntime = new ContextRuntime(serviceInjector, contextConfig, Optional<ContextRuntime>.Empty())) + { + var servicesFromInjector = serviceInjector.GetNamedInstance<ServicesSet, ISet<object>>(); + testService = servicesFromInjector.Single() as TestService; + Assert.NotNull(testService); + if (testService == null) + { + // Not possible + return; + } + + var testServiceFromInjector = serviceInjector.GetInstance<TestService>(); + Assert.True(ReferenceEquals(testService, testServiceFromInjector)); + + var contextTestService = contextRuntime.ContextInjector.GetInstance<TestService>(); + Assert.True(ReferenceEquals(contextTestService, testServiceFromInjector)); + } + + Assert.True(testService.Disposed); + } + + [Fact] + [Trait("Priority", "0")] + [Trait("Category", "Unit")] + public void TestServiceContextEventHandlersTriggered() + { + var launcher = GetRootContextLauncher( + GetContextConfiguration(), GetServiceConfiguration(), Optional<IConfiguration>.Empty()); + + IInjector serviceInjector = null; + IInjector contextInjector = null; + + using (var rootContext = launcher.GetRootContext()) + { + serviceInjector = rootContext.ServiceInjector; + contextInjector = rootContext.ContextInjector; + Assert.NotNull(serviceInjector); + Assert.NotNull(contextInjector); + } + + var serviceContextStartHandlers = + serviceInjector.GetNamedInstance<ContextConfigurationOptions.StartHandlers, ISet<IObserver<IContextStart>>>(); + + var contextContextStartHandlers = + contextInjector.GetNamedInstance<ContextConfigurationOptions.StartHandlers, ISet<IObserver<IContextStart>>>(); + + Assert.Equal(1, serviceContextStartHandlers.Count); + Assert.Equal(2, contextContextStartHandlers.Count); + + var serviceContextStartHandler = serviceContextStartHandlers.First() as TestServiceEventHandlers; + + Assert.True(contextContextStartHandlers.Contains(serviceContextStartHandler)); + + var serviceContextStopHandlers = + serviceInjector.GetNamedInstance<ContextConfigurationOptions.StopHandlers, ISet<IObserver<IContextStop>>>(); + + var contextContextStopHandlers = + contextInjector.GetNamedInstance<ContextConfigurationOptions.StopHandlers, ISet<IObserver<IContextStop>>>(); + + Assert.Equal(1, serviceContextStopHandlers.Count); + Assert.Equal(2, contextContextStopHandlers.Count); + + var serviceContextStopHandler = serviceContextStopHandlers.First() as TestServiceEventHandlers; + + Assert.True(contextContextStopHandlers.Contains(serviceContextStopHandler)); + + foreach (var contextStartHandler in contextContextStartHandlers.Select(h => h as ITestContextEventHandler)) + { + Assert.NotNull(contextStartHandler); + Assert.Equal(1, contextStartHandler.ContextStartInvoked); + Assert.Equal(1, contextStartHandler.ContextStopInvoked); + } + + foreach (var contextStopHandler in contextContextStopHandlers.Select(h => h as ITestContextEventHandler)) + { + Assert.NotNull(contextStopHandler); + Assert.Equal(1, contextStopHandler.ContextStartInvoked); + Assert.Equal(1, contextStopHandler.ContextStopInvoked); + } + } + + [Fact] + [Trait("Priority", "0")] + [Trait("Category", "Unit")] + public void TestServiceContextEventHandlersTriggeredSuccessiveContexts() + { + var launcher = GetRootContextLauncher( + GetContextConfiguration(), GetServiceConfiguration(), Optional<IConfiguration>.Empty()); + + IInjector serviceInjector = null; + IInjector firstContextInjector = null; + IInjector secondContextInjector = null; + + using (var rootContext = launcher.GetRootContext()) + { + serviceInjector = rootContext.ServiceInjector; + firstContextInjector = rootContext.ContextInjector; + using (var childContext = rootContext.SpawnChildContext(GetContextConfiguration())) + { + secondContextInjector = childContext.ContextInjector; + } + + Assert.NotNull(serviceInjector); + Assert.NotNull(firstContextInjector); + Assert.NotNull(secondContextInjector); + } + + var serviceContextStartHandlers = + serviceInjector.GetNamedInstance<ContextConfigurationOptions.StartHandlers, ISet<IObserver<IContextStart>>>(); + + var firstContextContextStartHandlers = + firstContextInjector.GetNamedInstance<ContextConfigurationOptions.StartHandlers, ISet<IObserver<IContextStart>>>(); + + var secondContextContextStartHandlers = + secondContextInjector.GetNamedInstance<ContextConfigurationOptions.StartHandlers, ISet<IObserver<IContextStart>>>(); + + Assert.Equal(1, serviceContextStartHandlers.Count); + Assert.Equal(2, firstContextContextStartHandlers.Count); + Assert.Equal(2, secondContextContextStartHandlers.Count); + + var intersectSet = new HashSet<IObserver<IContextStart>>(serviceContextStartHandlers); + intersectSet.IntersectWith(firstContextContextStartHandlers); + intersectSet.IntersectWith(secondContextContextStartHandlers); + + var unionSet = new HashSet<IObserver<IContextStart>>(serviceContextStartHandlers); + unionSet.UnionWith(firstContextContextStartHandlers); + unionSet.UnionWith(secondContextContextStartHandlers); + + Assert.Equal(1, intersectSet.Count); + Assert.Equal(3, unionSet.Count); + + var serviceContextHandler = serviceContextStartHandlers.Single() as ITestContextEventHandler; + var unionContextHandlerSet = new HashSet<ITestContextEventHandler>( + unionSet.Select(h => h as ITestContextEventHandler).Where(h => h != null)); + + Assert.Equal(unionSet.Count, unionContextHandlerSet.Count); + Assert.True(unionContextHandlerSet.Contains(serviceContextHandler)); + + foreach (var handler in unionContextHandlerSet.Where(h => h != null)) + { + if (ReferenceEquals(handler, serviceContextHandler)) + { + Assert.Equal(2, handler.ContextStartInvoked); + Assert.Equal(2, handler.ContextStopInvoked); + } + else + { + Assert.Equal(1, handler.ContextStartInvoked); + Assert.Equal(1, handler.ContextStopInvoked); + } + } + } + + [Fact] + [Trait("Priority", "0")] + [Trait("Category", "Unit")] + public async Task TestServiceTaskEventHandlersTriggered() + { + await RunTasksAndVerifyEventHandlers(1); + } + + [Fact] + [Trait("Priority", "0")] + [Trait("Category", "Unit")] + public async Task TestServiceTaskEventHandlersTriggeredSuccessiveTasks() + { + await RunTasksAndVerifyEventHandlers(5); + } + + private static async Task RunTasksAndVerifyEventHandlers(int tasksRun) + { + var launcher = GetRootContextLauncher( + GetContextConfiguration(), GetServiceConfiguration(), Optional<IConfiguration>.Of(GetTaskConfiguration())); + + IInjector serviceInjector = null; + + using (var rootContext = launcher.GetRootContext()) + { + serviceInjector = rootContext.ServiceInjector; + for (var i = 0; i < tasksRun; i++) + { + await rootContext.StartTask(launcher.RootTaskConfig.Value); + } + + Assert.NotNull(serviceInjector); + } + + var serviceTaskStartHandlers = + serviceInjector.GetNamedInstance<TaskConfigurationOptions.StartHandlers, ISet<IObserver<ITaskStart>>>(); + + Assert.Equal(1, serviceTaskStartHandlers.Count); + + var serviceTaskStartHandler = serviceTaskStartHandlers.First() as TestServiceEventHandlers; + + var serviceTaskStopHandlers = + serviceInjector.GetNamedInstance<TaskConfigurationOptions.StopHandlers, ISet<IObserver<ITaskStop>>>(); + + Assert.Equal(1, serviceTaskStopHandlers.Count); + + var serviceTaskStopHandler = serviceTaskStopHandlers.First() as TestServiceEventHandlers; + + Assert.Equal(serviceTaskStopHandler, serviceTaskStartHandler); + + Assert.NotNull(serviceTaskStartHandler); + + if (serviceTaskStartHandler == null || serviceTaskStopHandler == null) + { + // Get rid of warning. + throw new Exception(); + } + + Assert.Equal(tasksRun, serviceTaskStartHandler.TaskStartInvoked); + Assert.Equal(tasksRun, serviceTaskStopHandler.TaskStopInvoked); + } + + private static RootContextLauncher GetRootContextLauncher( + IConfiguration contextConfig, IConfiguration serviceConfig, Optional<IConfiguration> taskConfig) + { + var injector = TangFactory.GetTang().NewInjector(); + var serializer = injector.GetInstance<AvroConfigurationSerializer>(); + var contextConfigStr = serializer.ToString(contextConfig); + var serviceConfigStr = serializer.ToString(serviceConfig); + var taskConfigStr = Optional<string>.Empty(); + if (taskConfig.IsPresent()) + { + taskConfigStr = Optional<string>.Of(serializer.ToString(taskConfig.Value)); + } + + var contextLauncherConfigBuilder = TangFactory.GetTang().NewConfigurationBuilder() + .BindNamedParameter<RootContextConfiguration, string>(GenericType<RootContextConfiguration>.Class, contextConfigStr) + .BindNamedParameter<RootServiceConfiguration, string>(GenericType<RootServiceConfiguration>.Class, serviceConfigStr); + + if (taskConfigStr.IsPresent()) + { + contextLauncherConfigBuilder = contextLauncherConfigBuilder + .BindNamedParameter<InitialTaskConfiguration, string>(GenericType<InitialTaskConfiguration>.Class, taskConfigStr.Value); + } + + injector = injector.ForkInjector(contextLauncherConfigBuilder.Build()); + var heartbeatManager = Substitute.For<IHeartBeatManager>(); + injector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, heartbeatManager); + return injector.GetInstance<RootContextLauncher>(); + } + + private static IConfiguration GetTaskConfiguration() + { + return TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "ID") + .Set(TaskConfiguration.Task, GenericType<SimpleTestTask>.Class) + .Set(TaskConfiguration.OnTaskStart, GenericType<TestTaskEventHandlers>.Class) + .Set(TaskConfiguration.OnTaskStop, GenericType<TestTaskEventHandlers>.Class) + .Build(); + } + + private static IConfiguration GetContextConfiguration() + { + return ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, "ID") + .Set(ContextConfiguration.OnContextStart, GenericType<TestContextEventHandlers>.Class) + .Set(ContextConfiguration.OnContextStop, GenericType<TestContextEventHandlers>.Class) + .Build(); + } + + private static IConfiguration GetServiceConfiguration() + { + return ServiceConfiguration.ConfigurationModule + .Set(ServiceConfiguration.OnContextStarted, GenericType<TestServiceEventHandlers>.Class) + .Set(ServiceConfiguration.OnContextStop, GenericType<TestServiceEventHandlers>.Class) + .Set(ServiceConfiguration.OnTaskStarted, GenericType<TestServiceEventHandlers>.Class) + .Set(ServiceConfiguration.OnTaskStop, GenericType<TestServiceEventHandlers>.Class) + .Build(); + } + + private interface ITestContextEventHandler : IObserver<IContextStart>, IObserver<IContextStop> + { + int ContextStartInvoked { get; } + + int ContextStopInvoked { get; } + } + + private interface ITestTaskEventHandler : IObserver<ITaskStart>, IObserver<ITaskStop> + { + int TaskStartInvoked { get; } + + int TaskStopInvoked { get; } + } + + private sealed class TestServiceEventHandlers : ITestContextEventHandler, ITestTaskEventHandler + { + public int ContextStartInvoked { get; private set; } + + public int ContextStopInvoked { get; private set; } + + public int TaskStartInvoked { get; private set; } + + public int TaskStopInvoked { get; private set; } + + [Inject] + private TestServiceEventHandlers() + { + } + + public void OnNext(IContextStart value) + { + ContextStartInvoked++; + } + + public void OnNext(IContextStop value) + { + ContextStopInvoked++; + } + + public void OnNext(ITaskStart value) + { + TaskStartInvoked++; + } + + public void OnNext(ITaskStop value) + { + TaskStopInvoked++; + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + private class TestContextEventHandlers : ITestContextEventHandler + { + public int ContextStartInvoked { get; private set; } + + public int ContextStopInvoked { get; private set; } + + [Inject] + private TestContextEventHandlers() + { + } + + public void OnNext(IContextStart value) + { + ContextStartInvoked++; + } + + public void OnNext(IContextStop value) + { + ContextStopInvoked++; + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + private class TestTaskEventHandlers : ITestTaskEventHandler + { + public int TaskStartInvoked { get; private set; } + + public int TaskStopInvoked { get; private set; } + + [Inject] + private TestTaskEventHandlers() + { + } + + public void OnNext(ITaskStart value) + { + TaskStartInvoked++; + } + + public void OnNext(ITaskStop value) + { + TaskStopInvoked++; + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/c371affb/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj b/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj index 673fea1..e4b0028 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj @@ -53,7 +53,12 @@ under the License. <Compile Include="ContextRuntimeTests.cs" /> <Compile Include="EvaluatorConfigurationsTests.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> + <Compile Include="EvaluatorServiceTests.cs" /> <Compile Include="TaskRuntimeTests.cs" /> + <Compile Include="TestUtils\ITestService.cs" /> + <Compile Include="TestUtils\SimpleTestTask.cs" /> + <Compile Include="TestUtils\TestService.cs" /> + <Compile Include="TestUtils\TestTask.cs" /> </ItemGroup> <ItemGroup> <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj"> http://git-wip-us.apache.org/repos/asf/reef/blob/c371affb/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/ITestService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/ITestService.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/ITestService.cs new file mode 100644 index 0000000..e6e05b3 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/ITestService.cs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +namespace Org.Apache.REEF.Evaluator.Tests.TestUtils +{ + internal interface ITestService + { + // empty + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/c371affb/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/SimpleTestTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/SimpleTestTask.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/SimpleTestTask.cs new file mode 100644 index 0000000..0fde90f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/SimpleTestTask.cs @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Evaluator.Tests.TestUtils +{ + internal sealed class SimpleTestTask : ITask + { + [Inject] + private SimpleTestTask() + { + } + + public void Dispose() + { + } + + public byte[] Call(byte[] memento) + { + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/c371affb/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/TestService.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/TestService.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/TestService.cs new file mode 100644 index 0000000..e0b77e7 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/TestService.cs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Evaluator.Tests.TestUtils +{ + internal sealed class TestService : ITestService, IDisposable + { + [Inject] + private TestService() + { + Disposed = false; + } + + public bool Disposed { get; private set; } + + public void Dispose() + { + Disposed = true; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/c371affb/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/TestTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/TestTask.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/TestTask.cs new file mode 100644 index 0000000..9912b70 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TestUtils/TestTask.cs @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Threading; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Evaluator.Tests.TestUtils +{ + internal sealed class TestTask : ITask, IObserver<ITaskStop> + { + [Inject] + private TestTask() + { + CountDownEvent = new CountdownEvent(1); + StopEvent = new CountdownEvent(1); + DisposedEvent = new CountdownEvent(1); + } + + public CountdownEvent CountDownEvent { get; private set; } + + public CountdownEvent StopEvent { get; private set; } + + public CountdownEvent DisposedEvent { get; private set; } + + public void Dispose() + { + DisposedEvent.Signal(); + } + + public byte[] Call(byte[] memento) + { + CountDownEvent.Wait(); + return null; + } + + public void OnNext(ITaskStop value) + { + StopEvent.Signal(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file
