Repository: reef Updated Branches: refs/heads/master b5386f77e -> b45c29c5e
[REEF-1209] Implement SuspendTask on the Evaluator JIRA: [REEF-1209](https://issues.apache.org/jira/browse/REEF-1209) This closes #847 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/b45c29c5 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/b45c29c5 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/b45c29c5 Branch: refs/heads/master Commit: b45c29c5e14c55781ee0e22627985e5626db8e23 Parents: b5386f7 Author: Andrew Chung <[email protected]> Authored: Thu Feb 18 15:26:48 2016 -0800 Committer: Julia Wang <[email protected]> Committed: Wed Feb 24 14:20:35 2016 -0800 ---------------------------------------------------------------------- .../Org.Apache.REEF.Common.csproj | 2 + .../Runtime/Evaluator/Task/SuspendEventImpl.cs | 24 +-- .../Runtime/Evaluator/Task/TaskRuntime.cs | 17 ++- .../Tasks/Defaults/DefaultSuspendHandler.cs | 51 +++++++ .../Tasks/Events/ISuspendEvent.cs | 9 ++ .../Exceptions/TaskSuspendHandlerException.cs | 43 ++++++ .../Tasks/TaskConfigurationOptions.cs | 3 +- .../TaskRuntimeTests.cs | 150 ++++++++++++++++++- 8 files changed, 276 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index 22f5408..de1663f 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -160,6 +160,7 @@ under the License. <Compile Include="Services\ServiceConfiguration.cs" /> <Compile Include="Tasks\Defaults\DefaultDriverConnectionMessageHandler.cs" /> <Compile Include="Tasks\Defaults\DefaultDriverMessageHandler.cs" /> + <Compile Include="Tasks\Defaults\DefaultSuspendHandler.cs" /> <Compile Include="Tasks\Defaults\DefaultTaskMessageSource.cs" /> <Compile Include="Tasks\DriverConnectionState.cs" /> <Compile Include="Tasks\DriverConnectionMessageImpl.cs" /> @@ -168,6 +169,7 @@ under the License. <Compile Include="Tasks\Events\ISuspendEvent.cs" /> <Compile Include="Tasks\Events\ITaskStart.cs" /> <Compile Include="Tasks\Events\ITaskStop.cs" /> + <Compile Include="Tasks\Exceptions\TaskSuspendHandlerException.cs" /> <Compile Include="Tasks\IDriverConnectionMessageHandler.cs" /> <Compile Include="Tasks\IDriverMessageHandler.cs" /> <Compile Include="Tasks\IDriverConnectionMessage.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs index 2157d46..5033ecf 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/SuspendEventImpl.cs @@ -20,27 +20,17 @@ using Org.Apache.REEF.Utilities; namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { - internal sealed class SuspendEventImpl : ICloseEvent + /// <summary> + /// The implementation of an event from the driver signaling + /// that a task should be suspended. + /// </summary> + internal sealed class SuspendEventImpl : ISuspendEvent { - public SuspendEventImpl() - { - Value = Optional<byte[]>.Empty(); - } - public SuspendEventImpl(byte[] bytes) { - Value = Optional<byte[]>.OfNullable(bytes); - } - - public Optional<byte[]> Value - { - get { return Value; } - set { value = Value; } + Message = Optional<byte[]>.OfNullable(bytes); } - public override string ToString() - { - return "SuspendEvent{value=" + Value.ToString() + "}"; - } + public Optional<byte[]> Message { get; private set; } } } http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 6fa135b..ca86f32 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -22,8 +22,10 @@ using System.Threading; using Org.Apache.REEF.Common.Protobuf.ReefProtocol; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Common.Tasks.Exceptions; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Exceptions; +using Org.Apache.REEF.Tang.Implementations.InjectionPlan; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Attributes; @@ -39,6 +41,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task private readonly Optional<IDriverConnectionMessageHandler> _driverConnectionMessageHandler; private readonly Optional<IDriverMessageHandler> _driverMessageHandler; private readonly ITask _userTask; + private readonly IInjectionFuture<IObserver<ISuspendEvent>> _suspendHandlerFuture; private int _taskRan = 0; [Inject] @@ -46,12 +49,14 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task ITask userTask, IDriverMessageHandler driverMessageHandler, IDriverConnectionMessageHandler driverConnectionMessageHandler, - TaskStatus taskStatus) + TaskStatus taskStatus, + [Parameter(typeof(TaskConfigurationOptions.SuspendHandler))] IInjectionFuture<IObserver<ISuspendEvent>> suspendHandlerFuture) { _currentStatus = taskStatus; _driverMessageHandler = Optional<IDriverMessageHandler>.Of(driverMessageHandler); _driverConnectionMessageHandler = Optional<IDriverConnectionMessageHandler>.Of(driverConnectionMessageHandler); _userTask = userTask; + _suspendHandlerFuture = suspendHandlerFuture; } /// <summary> @@ -274,7 +279,15 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void OnNext(ISuspendEvent value) { Logger.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)"); - //// TODO: send a heartbeat + try + { + _suspendHandlerFuture.Get().OnNext(value); + } + catch (Exception ex) + { + var suspendEx = new TaskSuspendHandlerException("Unable to suspend task.", ex); + Utilities.Diagnostics.Exceptions.CaughtAndThrow(suspendEx, Level.Error, Logger); + } } public void OnNext(IDriverMessage value) http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultSuspendHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultSuspendHandler.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultSuspendHandler.cs new file mode 100644 index 0000000..18bc5c1 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Defaults/DefaultSuspendHandler.cs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Common.Tasks.Defaults +{ + /// <summary> + /// A default handler for an event from the driver signaling the suspension + /// of a task. Throws an exception by default, since a task should not have received + /// a suspension event if the handler is not bound explicitly. + /// </summary> + internal sealed class DefaultSuspendHandler : IObserver<ISuspendEvent> + { + [Inject] + private DefaultSuspendHandler() + { + } + + public void OnNext(ISuspendEvent value) + { + throw new Exception("No handler for suspend registered."); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs index 5fef335..2ad9326 100644 --- a/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Events/ISuspendEvent.cs @@ -15,9 +15,18 @@ // specific language governing permissions and limitations // under the License. +using Org.Apache.REEF.Utilities; + namespace Org.Apache.REEF.Common.Tasks.Events { + /// <summary> + /// An event from the driver signaling that a task should be suspended. + /// </summary> public interface ISuspendEvent { + /// <summary> + /// The optional suspension message from the driver. + /// </summary> + Optional<byte[]> Message { get; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs new file mode 100644 index 0000000..717c399 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; + +namespace Org.Apache.REEF.Common.Tasks.Exceptions +{ + /// <summary> + /// An exception that is thrown when the task suspension event + /// handler is not bound. + /// </summary> + internal sealed class TaskSuspendHandlerException : Exception + { + public TaskSuspendHandlerException(string message) + : base(message) + { + } + + public TaskSuspendHandlerException(Exception innerException) + : base(innerException.Message, innerException) + { + } + + public TaskSuspendHandlerException(string message, Exception innerException) + : base(message, innerException) + { + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs index 3842fb7..30dce1a 100644 --- a/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/TaskConfigurationOptions.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; +using Org.Apache.REEF.Common.Tasks.Defaults; using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.Tang.Annotations; @@ -54,7 +55,7 @@ namespace Org.Apache.REEF.Common.Tasks { } - [NamedParameter(documentation: "The event handler that receives the suspend event.")] + [NamedParameter(documentation: "The event handler that receives the suspend event.", defaultClass: typeof(DefaultSuspendHandler))] public class SuspendHandler : Name<IObserver<ISuspendEvent>> { } http://git-wip-us.apache.org/repos/asf/reef/blob/b45c29c5/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs index 0435357..56add09 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/TaskRuntimeTests.cs @@ -26,6 +26,7 @@ using Org.Apache.REEF.Common.Runtime.Evaluator; using Org.Apache.REEF.Common.Runtime.Evaluator.Task; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Driver.Task; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; @@ -123,7 +124,15 @@ namespace Org.Apache.REEF.Evaluator.Tests injector.GetInstance<CountDownAction>().CountdownEvent.Signal(); - var task = injector.GetInstance<TestTask>(); + var taskInterface = injector.GetInstance<ITask>(); + Assert.True(taskInterface is TestTask); + + var task = taskInterface as TestTask; + if (task == null) + { + throw new Exception("Task is expected to be an instance of TestTask."); + } + task.FinishCountdownEvent.Wait(); task.DisposeCountdownEvent.Wait(); @@ -214,6 +223,107 @@ namespace Org.Apache.REEF.Evaluator.Tests Assert.Equal(testTaskEventStopHandler.StopInvoked.Value, taskId); } + /// <summary> + /// Tests that suspend ends the task and invokes the right handler. + /// </summary> + [Fact] + public void TestSuspendTask() + { + var contextId = Guid.NewGuid().ToString(); + var taskId = Guid.NewGuid().ToString(); + + var injector = GetInjector(typeof(CountDownAction), contextId, taskId); + var taskRuntime = injector.GetInstance<TaskRuntime>(); + taskRuntime.RunTask(); + + var taskInterface = injector.GetInstance<ITask>(); + Assert.True(taskInterface is TestTask); + + var task = taskInterface as TestTask; + if (task == null) + { + throw new Exception("Task is expected to be an instance of TestTask."); + } + + taskRuntime.Suspend(null); + + task.FinishCountdownEvent.Wait(); + task.DisposeCountdownEvent.Wait(); + + Assert.True(task.SuspendInvoked); + } + + /// <summary> + /// Tests that suspend is not invoked after task is done. + /// </summary> + [Fact] + public void TestSuspendTaskAfterDoneIsNotSuspended() + { + var contextId = Guid.NewGuid().ToString(); + var taskId = Guid.NewGuid().ToString(); + + var injector = GetInjector(contextId, taskId); + var taskRuntime = injector.GetInstance<TaskRuntime>(); + taskRuntime.RunTask(); + + var taskInterface = injector.GetInstance<ITask>(); + Assert.True(taskInterface is TestTask); + + var task = taskInterface as TestTask; + if (task == null) + { + throw new Exception("Task is expected to be an instance of TestTask."); + } + + task.FinishCountdownEvent.Wait(); + task.DisposeCountdownEvent.Wait(); + + var stopHandlers = injector.GetNamedInstance<TaskConfigurationOptions.StopHandlers, ISet<IObserver<ITaskStop>>>(); + + var testTaskEventStopHandler = stopHandlers.Single() as TestTaskEventHandler; + if (testTaskEventStopHandler == null) + { + throw new Exception("Event handler is not expected to be null."); + } + + Assert.Equal(testTaskEventStopHandler.StopInvoked.Value, taskId); + + taskRuntime.Suspend(null); + Assert.False(task.SuspendInvoked); + } + + /// <summary> + /// Tests that suspend is not invoked after task is done. + /// </summary> + [Fact] + public void TestSuspendTaskAfterFailureIsNotSuspended() + { + var contextId = Guid.NewGuid().ToString(); + var taskId = Guid.NewGuid().ToString(); + + var injector = GetInjector(typeof(ExceptionAction), contextId, taskId); + var taskRuntime = injector.GetInstance<TaskRuntime>(); + taskRuntime.RunTask(); + + var task = injector.GetInstance<TestTask>(); + + task.DisposeCountdownEvent.Wait(); + + var stopHandlers = injector.GetNamedInstance<TaskConfigurationOptions.StopHandlers, ISet<IObserver<ITaskStop>>>(); + + var testTaskEventStopHandler = stopHandlers.Single() as TestTaskEventHandler; + if (testTaskEventStopHandler == null) + { + throw new Exception("Event handler is not expected to be null."); + } + + Assert.True(testTaskEventStopHandler.StopInvoked.IsPresent()); + Assert.Equal(taskRuntime.GetTaskState(), TaskState.Failed); + + taskRuntime.Suspend(null); + Assert.False(task.SuspendInvoked); + } + private static IInjector GetInjector(string contextId = "contextId", string taskId = "taskId") { return GetInjector(typeof(DefaultAction), contextId, taskId); @@ -233,6 +343,7 @@ namespace Org.Apache.REEF.Evaluator.Tests .Set(TaskConfiguration.OnTaskStart, GenericType<TestTaskEventHandler>.Class) .Set(TaskConfiguration.OnTaskStop, GenericType<TestTaskEventHandler>.Class) .Set(TaskConfiguration.Task, GenericType<TestTask>.Class) + .Set(TaskConfiguration.OnSuspend, GenericType<TestTask>.Class) .Build(); var actionConfig = confBuilder @@ -279,7 +390,7 @@ namespace Org.Apache.REEF.Evaluator.Tests } } - private sealed class TestTask : ITask + private sealed class TestTask : ITask, IObserver<ISuspendEvent> { private readonly IAction _action; @@ -291,6 +402,8 @@ namespace Org.Apache.REEF.Evaluator.Tests _action = action; } + public bool SuspendInvoked { get; private set; } + public CountdownEvent FinishCountdownEvent { get; private set; } public CountdownEvent DisposeCountdownEvent { get; private set; } @@ -313,11 +426,29 @@ namespace Org.Apache.REEF.Evaluator.Tests return null; } + + public void OnNext(ISuspendEvent value) + { + _action.OnSuspend(); + SuspendInvoked = true; + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } } private interface IAction { - Action Value { get; } + Action Value { get; } + + void OnSuspend(); } private sealed class DefaultAction : IAction @@ -335,6 +466,10 @@ namespace Org.Apache.REEF.Evaluator.Tests return () => { }; } } + + public void OnSuspend() + { + } } private sealed class ExceptionAction : IAction @@ -354,6 +489,10 @@ namespace Org.Apache.REEF.Evaluator.Tests }; } } + + public void OnSuspend() + { + } } private sealed class CountDownAction : IAction @@ -375,6 +514,11 @@ namespace Org.Apache.REEF.Evaluator.Tests } } + public void OnSuspend() + { + CountdownEvent.Signal(); + } + public CountdownEvent CountdownEvent { get; private set; } } }
