Repository: reef Updated Branches: refs/heads/master dc78d381f -> f4d40ca88
[REEF-1176] Add unit tests for ContextRuntime JIRA: [REEF-1176](https://issues.apache.org/jira/browse/REEF-1176) Pull Request: This closes #846 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/f4d40ca8 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/f4d40ca8 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/f4d40ca8 Branch: refs/heads/master Commit: f4d40ca88c32ed4f7df40908f14dfb081a72df05 Parents: dc78d38 Author: Andrew Chung <[email protected]> Authored: Wed Feb 17 17:53:00 2016 -0800 Committer: Markus Weimer <[email protected]> Committed: Fri Feb 19 18:35:36 2016 -0800 ---------------------------------------------------------------------- .../Runtime/Evaluator/Context/ContextManager.cs | 4 +- .../Runtime/Evaluator/Context/ContextRuntime.cs | 16 +- .../Runtime/Evaluator/Task/TaskRuntime.cs | 10 + .../ContextRuntimeTests.cs | 221 +++++++++++++++++++ 4 files changed, 245 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/f4d40ca8/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs index ef35f02..7d16ed4 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextManager.cs @@ -75,7 +75,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context LOGGER.Log(Level.Info, "Launching the initial Task"); try { - _topContext.StartTask(_rootContextLauncher.RootTaskConfig.Value, _rootContextLauncher.Id, _heartBeatManager); + _topContext.StartTask(_rootContextLauncher.RootTaskConfig.Value, _heartBeatManager); } catch (TaskClientCodeException e) { @@ -348,7 +348,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context } var configuration = _serializer.FromString(startTaskProto.configuration); - currentActiveContext.StartTask(configuration, expectedContextId, _heartBeatManager); + currentActiveContext.StartTask(configuration, _heartBeatManager); } } http://git-wip-us.apache.org/repos/asf/reef/blob/f4d40ca8/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs index 36566f5..f0b9f75 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs @@ -146,6 +146,14 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// <summary> /// For testing only! /// </summary> + internal Optional<TaskRuntime> TaskRuntime + { + get { return _task; } + } + + /// <summary> + /// For testing only! + /// </summary> [Testing] internal IInjector ContextInjector { @@ -230,12 +238,12 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context } /// <summary> - /// Launches an Task on this context. + /// Launches an Task on this context. + /// TODO[JIRA REEF-217]: Remove heartBeatManager from argument. /// </summary> /// <param name="taskConfiguration"></param> - /// <param name="contextId"></param> /// <param name="heartBeatManager"></param> - public void StartTask(IConfiguration taskConfiguration, string contextId, IHeartBeatManager heartBeatManager) + public void StartTask(IConfiguration taskConfiguration, IHeartBeatManager heartBeatManager) { lock (_contextLifeCycle) { @@ -265,7 +273,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context var taskInjector = _contextInjector.ForkInjector(taskConfiguration); var taskRuntime = _deprecatedTaskStart - ? GetDeprecatedTaskRuntime(taskInjector, contextId, taskConfiguration, heartBeatManager) + ? GetDeprecatedTaskRuntime(taskInjector, Id, taskConfiguration, heartBeatManager) : taskInjector.GetInstance<TaskRuntime>(); try http://git-wip-us.apache.org/repos/asf/reef/blob/f4d40ca8/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 235eef7..6fa135b 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -26,6 +26,7 @@ using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task @@ -117,6 +118,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } /// <summary> + /// For testing only! + /// </summary> + [Testing] + internal ITask Task + { + get { return _userTask; } + } + + /// <summary> /// Runs the task asynchronously. /// </summary> public void RunTask() http://git-wip-us.apache.org/repos/asf/reef/blob/f4d40ca8/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs index 2adae71..854a52b 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs @@ -19,10 +19,16 @@ using System; using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading; +using NSubstitute; using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Common.Events; +using Org.Apache.REEF.Common.Protobuf.ReefProtocol; +using Org.Apache.REEF.Common.Runtime.Evaluator; using Org.Apache.REEF.Common.Runtime.Evaluator.Context; using Org.Apache.REEF.Common.Services; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; @@ -149,6 +155,176 @@ namespace Org.Apache.REEF.Evaluator.Tests [Fact] [Trait("Priority", "0")] [Trait("Category", "Unit")] + public void TestContextStackingDoesNotGetSameInstance() + { + var serviceInjector = TangFactory.GetTang().NewInjector(); + var contextConfig = GetContextEventHandlerContextConfiguration(); + + using (var contextRuntime = new ContextRuntime(serviceInjector, contextConfig, Optional<ContextRuntime>.Empty())) + { + var childContextConfiguration = GetContextEventHandlerContextConfiguration(); + using (var childContextRuntime = contextRuntime.SpawnChildContext(childContextConfiguration)) + { + Assert.False(ReferenceEquals( + contextRuntime.ContextInjector.GetInstance<TestContextEventHandler>(), + childContextRuntime.ContextInjector.GetInstance<TestContextEventHandler>())); + } + } + } + + [Fact] + [Trait("Priority", "0")] + [Trait("Category", "Unit")] + public void TestContextStackingParentContext() + { + var serviceInjector = TangFactory.GetTang().NewInjector(); + var contextConfig = GetSimpleContextConfiguration(); + + using (var contextRuntime = new ContextRuntime(serviceInjector, contextConfig, Optional<ContextRuntime>.Empty())) + { + var childContextConfiguration = GetSimpleContextConfiguration(); + using (var childContextRuntime = contextRuntime.SpawnChildContext(childContextConfiguration)) + { + Assert.False(contextRuntime.ParentContext.IsPresent()); + Assert.True(childContextRuntime.ParentContext.IsPresent()); + Assert.True(ReferenceEquals(contextRuntime, childContextRuntime.ParentContext.Value)); + } + } + } + + [Fact] + [Trait("Priority", "0")] + [Trait("Category", "Unit")] + public void TestUnableToSpawnChildWhileTaskIsRunning() + { + var serviceInjector = TangFactory.GetTang().NewInjector(); + var contextConfig = GetSimpleContextConfiguration(); + + using (var contextRuntime = new ContextRuntime(serviceInjector, contextConfig, Optional<ContextRuntime>.Empty())) + { + var taskConfig = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Task, GenericType<TestTask>.Class) + .Set(TaskConfiguration.Identifier, "ID") + .Build(); + + try + { + var hbMgr = Substitute.For<IHeartBeatManager>(); + contextRuntime.ContextInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, hbMgr); + contextRuntime.StartTask(taskConfig, hbMgr); + + Assert.True(contextRuntime.TaskRuntime.IsPresent()); + Assert.True(contextRuntime.GetTaskStatus().IsPresent()); + Assert.Equal(contextRuntime.GetTaskStatus().Value.state, State.RUNNING); + + var childContextConfiguration = GetSimpleContextConfiguration(); + + Assert.Throws<InvalidOperationException>( + () => contextRuntime.SpawnChildContext(childContextConfiguration)); + } + finally + { + var testTask = contextRuntime.TaskRuntime.Value.Task as TestTask; + if (testTask == null) + { + throw new Exception(); + } + + testTask.CountDownEvent.Signal(); + } + } + } + + [Fact] + [Trait("Priority", "0")] + [Trait("Category", "Unit")] + public void TestUnableToRunMultipleTasksAtTheSameTime() + { + var serviceInjector = TangFactory.GetTang().NewInjector(); + var contextConfig = GetSimpleContextConfiguration(); + + using (var contextRuntime = new ContextRuntime(serviceInjector, contextConfig, Optional<ContextRuntime>.Empty())) + { + var taskConfig = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Task, GenericType<TestTask>.Class) + .Set(TaskConfiguration.Identifier, "ID") + .Build(); + + try + { + var hbMgr = Substitute.For<IHeartBeatManager>(); + contextRuntime.ContextInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, hbMgr); + contextRuntime.StartTask(taskConfig, hbMgr); + + Assert.True(contextRuntime.TaskRuntime.IsPresent()); + Assert.True(contextRuntime.GetTaskStatus().IsPresent()); + Assert.Equal(contextRuntime.GetTaskStatus().Value.state, State.RUNNING); + + Assert.Throws<InvalidOperationException>( + () => contextRuntime.StartTask(taskConfig, hbMgr)); + } + finally + { + var testTask = contextRuntime.TaskRuntime.Value.Task as TestTask; + if (testTask == null) + { + throw new Exception(); + } + + testTask.CountDownEvent.Signal(); + } + } + } + + [Fact] + [Trait("Priority", "0")] + [Trait("Category", "Unit")] + public void TestTwoSuccessiveTasksOnSameContext() + { + var serviceInjector = TangFactory.GetTang().NewInjector(); + var contextConfig = GetSimpleContextConfiguration(); + + using (var contextRuntime = new ContextRuntime(serviceInjector, contextConfig, Optional<ContextRuntime>.Empty())) + { + var taskConfig = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Task, GenericType<TestTask>.Class) + .Set(TaskConfiguration.OnTaskStop, GenericType<TestTask>.Class) + .Set(TaskConfiguration.Identifier, "ID") + .Build(); + + var hbMgr = Substitute.For<IHeartBeatManager>(); + contextRuntime.ContextInjector.BindVolatileInstance(GenericType<IHeartBeatManager>.Class, hbMgr); + contextRuntime.StartTask(taskConfig, hbMgr); + var testTask = contextRuntime.TaskRuntime.Value.Task as TestTask; + if (testTask == null) + { + throw new Exception(); + } + + testTask.CountDownEvent.Signal(); + testTask.StopEvent.Wait(); + Assert.False(contextRuntime.GetTaskStatus().IsPresent()); + + contextRuntime.StartTask(taskConfig, hbMgr); + Assert.Equal(contextRuntime.GetTaskStatus().Value.state, State.RUNNING); + + var secondTestTask = contextRuntime.TaskRuntime.Value.Task as TestTask; + if (secondTestTask == null) + { + throw new Exception(); + } + + Assert.False(ReferenceEquals(testTask, secondTestTask)); + + secondTestTask.CountDownEvent.Signal(); + secondTestTask.StopEvent.Wait(); + Assert.False(contextRuntime.GetTaskStatus().IsPresent()); + } + } + + [Fact] + [Trait("Priority", "0")] + [Trait("Category", "Unit")] public void TestServiceStacking() { var serviceConfiguration = ServiceConfiguration.ConfigurationModule @@ -182,6 +358,11 @@ namespace Org.Apache.REEF.Evaluator.Tests } } + private static IConfiguration GetSimpleContextConfiguration() + { + return ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier, "ID").Build(); + } + private static IConfiguration GetContextEventHandlerContextConfiguration() { return ContextConfiguration.ConfigurationModule @@ -260,5 +441,45 @@ namespace Org.Apache.REEF.Evaluator.Tests throw new NotImplementedException(); } } + + private sealed class TestTask : ITask, IObserver<ITaskStop> + { + [Inject] + private TestTask() + { + CountDownEvent = new CountdownEvent(1); + StopEvent = new CountdownEvent(1); + } + + public CountdownEvent CountDownEvent { get; private set; } + + public CountdownEvent StopEvent { get; private set; } + + public void Dispose() + { + throw new NotImplementedException(); + } + + public byte[] Call(byte[] memento) + { + CountDownEvent.Wait(); + return null; + } + + public void OnNext(ITaskStop value) + { + StopEvent.Signal(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } } }
