Repository: reef Updated Branches: refs/heads/master b5647362a -> 414d4b451
[REEF-1392] Adding IObserver<ICloseEvent> for IMRU tasks * IMRU tasks implement IObserver<ICloseEvent> * Task will return after it receives the close event if possible or enforce to close * Update TaskStateMachine to allow transit from TaskWaitingForClose with CompletedTask event to TaskClosedByDriver state * Adding test cases JIRA: [REEF-1392](https://issues.apache.org/jira/browse/REEF-1392) This closes #1009 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/414d4b45 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/414d4b45 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/414d4b45 Branch: refs/heads/master Commit: 414d4b45114c6229720262f3588ecff3a35334b1 Parents: b564736 Author: Julia Wang <[email protected]> Authored: Wed Jun 1 19:28:50 2016 -0700 Committer: dhruv <[email protected]> Committed: Wed Jun 1 22:47:08 2016 -0700 ---------------------------------------------------------------------- .../TestTaskStates.cs | 17 +- .../OnREEF/Driver/IMRUDriver.cs | 3 + .../Driver/StateMachine/TaskStateMachine.cs | 1 + .../OnREEF/Driver/TaskManager.cs | 25 +-- .../OnREEF/IMRUTasks/MapTaskHost.cs | 77 ++++++- .../OnREEF/IMRUTasks/UpdateTaskHost.cs | 81 ++++++- .../EnforceCloseTimeoutMilliseconds.cs | 31 +++ .../Org.Apache.REEF.IMRU.csproj | 1 + .../Functional/Bridge/TestCloseTask.cs | 218 +++++++++++++++++-- .../Org.Apache.REEF.Tests.csproj | 2 +- 10 files changed, 413 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskStates.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskStates.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskStates.cs index 0c858da..561202e 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskStates.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskStates.cs @@ -125,14 +125,25 @@ namespace Org.Apache.REEF.IMRU.Tests Action moveNext = () => taskState.MoveNext(TaskStateEvent.RunningTask); Assert.Throws<TaskStateTransitionException>(moveNext); - moveNext = () => taskState.MoveNext(TaskStateEvent.CompletedTask); - Assert.Throws<TaskStateTransitionException>(moveNext); - moveNext = () => taskState.MoveNext(TaskStateEvent.SubmittedTask); Assert.Throws<TaskStateTransitionException>(moveNext); } /// <summary> + /// This is to test from WaitingTaskToClose to receiving CompletedTask event + /// </summary> + [Fact] + public void TestRunningToWaitingTaskToCloseToComplete() + { + var taskState = new TaskStateMachine(); + taskState.MoveNext(TaskStateEvent.SubmittedTask); + taskState.MoveNext(TaskStateEvent.RunningTask); + taskState.MoveNext(TaskStateEvent.WaitingTaskToClose); + taskState.MoveNext(TaskStateEvent.CompletedTask); + Assert.Equal(TaskState.TaskClosedByDriver, taskState.CurrentState); + } + + /// <summary> /// This is to test from RunningTask to TaskFailedByEvaluatorFailure. /// </summary> [Fact] http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs index 59be761..58b75ed 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs @@ -329,6 +329,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver .NewConfigurationBuilder(TaskConfiguration.ConfigurationModule .Set(TaskConfiguration.Identifier, taskId) .Set(TaskConfiguration.Task, GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class) + .Set(TaskConfiguration.OnClose, GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class) .Build(), _configurationManager.MapFunctionConfiguration, mapSpecificConfig, @@ -351,6 +352,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver IMRUConstants.UpdateTaskName) .Set(TaskConfiguration.Task, GenericType<UpdateTaskHost<TMapInput, TMapOutput, TResult>>.Class) + .Set(TaskConfiguration.OnClose, + GenericType<UpdateTaskHost<TMapInput, TMapOutput, TResult>>.Class) .Build(), _configurationManager.UpdateFunctionConfiguration, _configurationManager.ResultHandlerConfiguration, http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/StateMachine/TaskStateMachine.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/StateMachine/TaskStateMachine.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/StateMachine/TaskStateMachine.cs index 17c0764..b1ed8dc 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/StateMachine/TaskStateMachine.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/StateMachine/TaskStateMachine.cs @@ -50,6 +50,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine { new StateTransition<TaskState, TaskStateEvent>(TaskState.TaskWaitingForClose, TaskStateEvent.FailedTaskSystemError), TaskState.TaskClosedByDriver }, { new StateTransition<TaskState, TaskStateEvent>(TaskState.TaskWaitingForClose, TaskStateEvent.FailedTaskEvaluatorError), TaskState.TaskClosedByDriver }, { new StateTransition<TaskState, TaskStateEvent>(TaskState.TaskWaitingForClose, TaskStateEvent.FailedTaskCommunicationError), TaskState.TaskClosedByDriver }, + { new StateTransition<TaskState, TaskStateEvent>(TaskState.TaskWaitingForClose, TaskStateEvent.CompletedTask), TaskState.TaskClosedByDriver }, { new StateTransition<TaskState, TaskStateEvent>(TaskState.TaskFailedBySystemError, TaskStateEvent.FailedTaskEvaluatorError), TaskState.TaskFailedByEvaluatorFailure }, { new StateTransition<TaskState, TaskStateEvent>(TaskState.TaskFailedByGroupCommunication, TaskStateEvent.FailedTaskEvaluatorError), TaskState.TaskFailedByEvaluatorFailure } }); http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs index 584809b..3bf6d75 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs @@ -201,13 +201,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <summary> /// This method is called when receiving ICompletedTask event during task running or system shutting down. - /// Removes the task from running tasks + /// Removes the task from running tasks if it was running /// Changes the task state from RunningTask to CompletedTask /// </summary> /// <param name="completedTask"></param> internal void RecordCompletedTask(ICompletedTask completedTask) { - RemoveRunningTask(completedTask.Id); + _runningTasks.Remove(completedTask.Id); UpdateState(completedTask.Id, TaskStateEvent.CompletedTask); } @@ -258,7 +258,12 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver var taskState = GetTaskState(taskId); if (taskState == StateMachine.TaskState.TaskRunning) { - RemoveRunningTask(taskId); + if (!_runningTasks.ContainsKey(taskId)) + { + var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist in Running Tasks.", taskId); + Exceptions.Throw(new IMRUSystemException(msg), Logger); + } + _runningTasks.Remove(taskId); } UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError); @@ -266,20 +271,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> - /// Removes a task from running tasks if it exists in the running tasks collection - /// </summary> - /// <param name="taskId"></param> - private void RemoveRunningTask(string taskId) - { - if (!_runningTasks.ContainsKey(taskId)) - { - var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist in Running Tasks.", taskId); - Exceptions.Throw(new IMRUSystemException(msg), Logger); - } - _runningTasks.Remove(taskId); - } - - /// <summary> /// Updates task state for a given taskId based on the task event /// </summary> /// <param name="taskId"></param> http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs index c4a101d..5f9823a 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs @@ -16,7 +16,10 @@ // under the License. using System; +using System.Text; +using System.Threading; using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.IMRU.API; using Org.Apache.REEF.IMRU.OnREEF.Driver; using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage; @@ -24,6 +27,7 @@ using Org.Apache.REEF.IMRU.OnREEF.Parameters; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks @@ -33,7 +37,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// </summary> /// <typeparam name="TMapInput">Map input</typeparam> /// <typeparam name="TMapOutput">Map output</typeparam> - internal sealed class MapTaskHost<TMapInput, TMapOutput> : ITask + [ThreadSafe] + internal sealed class MapTaskHost<TMapInput, TMapOutput> : ITask, IObserver<ICloseEvent> { private static readonly Logger Logger = Logger.GetLogger(typeof(MapTaskHost<TMapInput, TMapOutput>)); @@ -43,14 +48,38 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks private readonly bool _invokeGC; /// <summary> + /// When receiving a close event, this variable is set to 1. At the beginning of each task iteration, + /// if this variable is set to 1, the task will break from the loop and return from the Call() method. + /// </summary> + private long _shouldCloseTask = 0; + + /// <summary> + /// Before the task is returned, this variable is set to 1. + /// Close handler will check this variable to decide if it needs to throw an exception. + /// </summary> + private long _isTaskStopped = 0; + + /// <summary> + /// Waiting time for the task to close by itself + /// </summary> + private readonly int _enforceCloseTimeoutMilliseconds; + + /// <summary> + /// An event that will wait in close handler until it is either signaled from Call method or timeout. + /// </summary> + private readonly ManualResetEventSlim _waitToCloseEvent = new ManualResetEventSlim(false); + + /// <summary> /// </summary> /// <param name="mapTask">The MapTask hosted in this REEF Task.</param> /// <param name="groupCommunicationsClient">Used to setup the communications.</param> + /// <param name="enforceCloseTimeoutMilliseconds">Timeout to enforce the task to close if receiving task close event</param> /// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param> [Inject] private MapTaskHost( IMapFunction<TMapInput, TMapOutput> mapTask, IGroupCommClient groupCommunicationsClient, + [Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int enforceCloseTimeoutMilliseconds, [Parameter(typeof(InvokeGC))] bool invokeGC) { _mapTask = mapTask; @@ -59,6 +88,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks cg.GetBroadcastReceiver<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName); _dataReducer = cg.GetReduceSender<TMapOutput>(IMRUConstants.ReduceOperatorName); _invokeGC = invokeGC; + _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds; } /// <summary> @@ -68,7 +98,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// <returns></returns> public byte[] Call(byte[] memento) { - while (true) + while (Interlocked.Read(ref _shouldCloseTask) == 0) { if (_invokeGC) { @@ -91,14 +121,55 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks _dataReducer.Send(result); } + + Interlocked.Exchange(ref _isTaskStopped, 1); + + if (Interlocked.Read(ref _shouldCloseTask) == 1) + { + _waitToCloseEvent.Set(); + } return null; } /// <summary> - /// Dispose function + /// Task close handler. + /// If the closed event is sent from driver, set _shouldCloseTask to 1 so that to inform the Call() to stop at the end of the current iteration. + /// Then waiting for the signal from Call method. Either it is signaled or after _enforceCloseTimeoutMilliseconds, + /// checks if the task has been stopped. If not, throw IMRUTaskSystemException to enforce the task to stop. + /// </summary> + /// <param name="closeEvent"></param> + public void OnNext(ICloseEvent closeEvent) + { + var msg = Encoding.UTF8.GetString(closeEvent.Value.Value); + if (closeEvent.Value.IsPresent() && msg.Equals(TaskManager.CloseTaskByDriver)) + { + Logger.Log(Level.Info, "The task received close event with message: {0}.", msg); + Interlocked.Exchange(ref _shouldCloseTask, 1); + + _waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds)); + + if (Interlocked.Read(ref _isTaskStopped) == 0) + { + throw new IMRUTaskSystemException(TaskManager.TaskKilledByDriver); + } + } + } + + /// <summary> + /// Dispose function /// </summary> public void Dispose() { } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs index f03a8e1..4f9ad9b 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs @@ -16,7 +16,10 @@ // under the License. using System; +using System.Text; +using System.Threading; using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; using Org.Apache.REEF.IMRU.API; using Org.Apache.REEF.IMRU.OnREEF.Driver; using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage; @@ -24,6 +27,7 @@ using Org.Apache.REEF.IMRU.OnREEF.Parameters; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks @@ -34,7 +38,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// <typeparam name="TMapInput">Map input</typeparam> /// <typeparam name="TMapOutput">Map output</typeparam> /// <typeparam name="TResult">Final result</typeparam> - internal sealed class UpdateTaskHost<TMapInput, TMapOutput, TResult> : ITask + [ThreadSafe] + internal sealed class UpdateTaskHost<TMapInput, TMapOutput, TResult> : ITask, IObserver<ICloseEvent> { private static readonly Logger Logger = Logger.GetLogger(typeof(UpdateTaskHost<TMapInput, TMapOutput, TResult>)); @@ -45,16 +50,40 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks private readonly IIMRUResultHandler<TResult> _resultHandler; /// <summary> + /// When receiving a close event, this variable is set to 1. At the beginning of each task iteration, + /// if this variable is set to 1, the task will break from the loop and return from the Call() method. + /// </summary> + private long _shouldCloseTask = 0; + + /// <summary> + /// Before the task is returned, this variable is set to 1. + /// Close handler will check this variable to decide if it needs to throw an exception. + /// </summary> + private long _isTaskStopped = 0; + + /// <summary> + /// Waiting time for the task to close by itself + /// </summary> + private readonly int _enforceCloseTimeoutMilliseconds; + + /// <summary> + /// An event that will wait in close handler until it is either signaled from Call method or timeout. + /// </summary> + private readonly ManualResetEventSlim _waitToCloseEvent = new ManualResetEventSlim(false); + + /// <summary> /// </summary> /// <param name="updateTask">The UpdateTask hosted in this REEF Task.</param> /// <param name="groupCommunicationsClient">Used to setup the communications.</param> /// <param name="resultHandler">Result handler</param> + /// <param name="enforceCloseTimeoutMilliseconds">Timeout in milliseconds to enforce the task to close if receiving task close event</param> /// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param> [Inject] private UpdateTaskHost( IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask, IGroupCommClient groupCommunicationsClient, IIMRUResultHandler<TResult> resultHandler, + [Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int enforceCloseTimeoutMilliseconds, [Parameter(typeof(InvokeGC))] bool invokeGC) { _updateTask = updateTask; @@ -64,6 +93,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks _dataReceiver = cg.GetReduceReceiver<TMapOutput>(IMRUConstants.ReduceOperatorName); _invokeGC = invokeGC; _resultHandler = resultHandler; + _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds; } /// <summary> @@ -76,7 +106,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks var updateResult = _updateTask.Initialize(); int iterNo = 0; - while (updateResult.HasMapInput) + while (updateResult.HasMapInput && Interlocked.Read(ref _shouldCloseTask) == 0) { iterNo++; @@ -104,19 +134,62 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks } } - MapInputWithControlMessage<TMapInput> stopMessage = + if (Interlocked.Read(ref _shouldCloseTask) == 0) + { + MapInputWithControlMessage<TMapInput> stopMessage = new MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop); - _dataAndControlMessageSender.Send(stopMessage); + _dataAndControlMessageSender.Send(stopMessage); + } _resultHandler.Dispose(); + Interlocked.Exchange(ref _isTaskStopped, 1); + + if (Interlocked.Read(ref _shouldCloseTask) == 1) + { + _waitToCloseEvent.Set(); + } return null; } /// <summary> + /// Task close handler. + /// If the closed event is sent from driver, set _shouldCloseTask to 1 so that to inform the Call() to stop at the end of the current iteration. + /// Then waiting for the signal from Call method. Either it is signaled or after _enforceCloseTimeoutMilliseconds, + /// checks if the task has been stopped. If not, throw IMRUTaskSystemException to enforce the task to stop. + /// </summary> + /// <param name="closeEvent"></param> + public void OnNext(ICloseEvent closeEvent) + { + var msg = Encoding.UTF8.GetString(closeEvent.Value.Value); + if (closeEvent.Value.IsPresent() && msg.Equals(TaskManager.CloseTaskByDriver)) + { + Logger.Log(Level.Info, "The task received close event with message: {0}.", msg); + Interlocked.Exchange(ref _shouldCloseTask, 1); + + _waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds)); + + if (Interlocked.Read(ref _isTaskStopped) == 0) + { + throw new IMRUTaskSystemException(TaskManager.TaskKilledByDriver); + } + } + } + + /// <summary> /// Dispose function /// </summary> public void Dispose() { } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs new file mode 100644 index 0000000..e177895 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/EnforceCloseTimeoutMilliseconds.cs @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.IMRU.OnREEF.Parameters +{ + /// <summary> + /// When driver sends close event to a task, it would expect the task to close gracefully. + /// After specified time out, if the task is still not closed, the close handler will throw exception, + /// enforce the task to close after waiting for this much time (in milliseconds). + /// </summary> + [NamedParameter("Enforce the task to close after waiting for this much time (in milliseconds).", "EnforceCloseTimeout", "1000")] + internal sealed class EnforceCloseTimeoutMilliseconds : Name<int> + { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index da19271..3f62b1f 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -99,6 +99,7 @@ under the License. <Compile Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" /> <Compile Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessageCodec.cs" /> <Compile Include="OnREEF\MapInputWithControlMessage\MapInputwithControlMessagePipelineDataConverter.cs" /> + <Compile Include="OnREEF\Parameters\EnforceCloseTimeoutMilliseconds.cs" /> <Compile Include="OnREEF\Parameters\InvokeGC .cs" /> <Compile Include="OnREEF\Parameters\AllowedFailedEvaluatorsFraction.cs" /> <Compile Include="OnREEF\Parameters\CoresForUpdateTask.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs index a18a211..2e97320 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestCloseTask.cs @@ -27,6 +27,9 @@ using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.IMRU.OnREEF.Driver; +using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; +using Org.Apache.REEF.IMRU.OnREEF.Parameters; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Implementations.Configuration; @@ -51,6 +54,8 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge private const string NoMessage = "NO_MESSAGE"; private const string CompletedValidationMessage = "CompletedValidationmessage"; private const string FailToCloseTaskMessage = "FailToCloseTaskMessage"; + private const string BreakTaskMessage = "BreakTaskMessage"; + private const string EnforceToCloseMessage = "EnforceToCloseMessage"; /// <summary> /// This test is close a running task with a close handler registered @@ -69,6 +74,40 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge } /// <summary> + /// This test is to close a running task and enforce it to break and return after the current iteration + /// </summary> + [Fact] + public void TestBreakTaskOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForBreakTask()), typeof(CloseTaskTestDriver), 1, "TestBreakTask", "local", testFolder); + ValidateSuccessForLocalRuntime(1, testFolder: testFolder); + ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 1); + var messages = new List<string>(); + messages.Add(DisposeMessageFromDriver); + messages.Add(BreakTaskMessage); + ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, -1); + CleanUp(testFolder); + } + + /// <summary> + /// This test is to close a running task and enforce it to break and return after the current iteration + /// </summary> + [Fact] + public void TestEnforceCloseTaskOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(DriverConfigurations(DisposeMessageFromDriver, GetTaskConfigurationForEnforceToCloseTask()), typeof(CloseTaskTestDriver), 1, "TestEnforceCloseTask", "local", testFolder); + ValidateSuccessForLocalRuntime(1, 1, 0, testFolder); + ValidateMessageSuccessfullyLoggedForDriver(CompletedValidationMessage, testFolder, 0); + var messages = new List<string>(); + messages.Add(DisposeMessageFromDriver); + messages.Add(EnforceToCloseMessage); + ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, -1); + CleanUp(testFolder); + } + + /// <summary> /// This test is to close a running task with exception throw in close handler /// Expect to receive Exception in Failed Task event handler in driver /// </summary> @@ -128,8 +167,31 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge { return TaskConfiguration.ConfigurationModule .Set(TaskConfiguration.Identifier, "TaskID") - .Set(TaskConfiguration.Task, GenericType<TestCloseTask.CloseTestTask>.Class) - .Set(TaskConfiguration.OnClose, GenericType<TestCloseTask.CloseTestTask>.Class) + .Set(TaskConfiguration.Task, GenericType<TestCloseTask.CloseByReturnTestTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<TestCloseTask.CloseByReturnTestTask>.Class) + .Build(); + } + + private IConfiguration GetTaskConfigurationForBreakTask() + { + return TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "TaskID") + .Set(TaskConfiguration.Task, GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class) + .Build(); + } + private IConfiguration GetTaskConfigurationForEnforceToCloseTask() + { + var taskConfig = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, "TaskID-EnforceToClose") + .Set(TaskConfiguration.Task, GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<TestCloseTask.CloseByBreakAndEnforceToStopTask>.Class) + .Build(); + + return TangFactory.GetTang() + .NewConfigurationBuilder(taskConfig) + .BindIntNamedParam<EnforceCloseTimeoutMilliseconds>("1000") + .BindNamedParameter<EnforceClose, bool>(GenericType<EnforceClose>.Class, "true") .Build(); } @@ -137,8 +199,8 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge { return TaskConfiguration.ConfigurationModule .Set(TaskConfiguration.Identifier, "TaskID-FailToClose") - .Set(TaskConfiguration.Task, GenericType<TestCloseTask.FailToCloseTask>.Class) - .Set(TaskConfiguration.OnClose, GenericType<TestCloseTask.FailToCloseTask>.Class) + .Set(TaskConfiguration.Task, GenericType<TestCloseTask.CloseByThrowExceptionTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<TestCloseTask.CloseByThrowExceptionTask>.Class) .Build(); } @@ -146,7 +208,7 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge { return TaskConfiguration.ConfigurationModule .Set(TaskConfiguration.Identifier, "TaskID-NoCloseHandler") - .Set(TaskConfiguration.Task, GenericType<TestCloseTask.NoCloseHandlerTask>.Class) + .Set(TaskConfiguration.Task, GenericType<TestCloseTask.MissingCloseHandlerTask>.Class) .Build(); } @@ -194,14 +256,14 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge { private readonly IEvaluatorRequestor _requestor; private int _contextNumber = 0; - private string _disposeMessage; - private IConfiguration _taskConfiguration; + private readonly string _disposeMessage; + private readonly IConfiguration _taskConfiguration; [Inject] private CloseTaskTestDriver(IEvaluatorRequestor evaluatorRequestor, [Parameter(typeof(DisposeMessage))] string disposeMessage, [Parameter(typeof(TaskConfigurationString))] string taskConfigString, - AvroConfigurationSerializer avroConfigurationSerializer) + IConfigurationSerializer avroConfigurationSerializer) { _requestor = evaluatorRequestor; _disposeMessage = disposeMessage; @@ -246,7 +308,11 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge { Assert.Contains(DefaultTaskCloseHandler.ExceptionMessage, failedExeption); } - + if (value.Id.EndsWith("TaskID-EnforceToClose")) + { + Assert.Contains(TaskManager.TaskKilledByDriver, failedExeption); + } + value.GetActiveContext().Value.Dispose(); } @@ -274,12 +340,15 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge } } - private sealed class CloseTestTask : ITask, IObserver<ICloseEvent> + /// <summary> + /// This test task receives close event, then signals Call() method to properly return. + /// </summary> + private sealed class CloseByReturnTestTask : ITask, IObserver<ICloseEvent> { private readonly CountdownEvent _suspendSignal = new CountdownEvent(1); [Inject] - private CloseTestTask() + private CloseByReturnTestTask([Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int enforceCloseTimeout) { } @@ -322,12 +391,123 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge } } - private sealed class FailToCloseTask : ITask, IObserver<ICloseEvent> + /// <summary> + /// This is a testing task. It serves for two test cases. + /// In the first case, EnforceClose is false (default). When the task receives the close event, it signals the Call method + /// to let it continue the iteration. As _shouldCloseTask is set to 1, the Call() will return after + /// completing the current iteration. + /// In the second case, EnforceClose is set to true. When the task receives the close event, it sets + /// _shouldCloseTask to 1. As the task is hung in this scenario, Call() would never return. + /// After waiting for _enforceCloseTimeoutMilliseconds, the close handler throws an exception, enforcing the task to stop. + /// </summary> + private sealed class CloseByBreakAndEnforceToStopTask : ITask, IObserver<ICloseEvent> + { + private long _shouldCloseTask = 0; + private long _isTaskStopped = 0; + private readonly bool _enforceClose; + private readonly int _enforceCloseTimeoutMilliseconds; + + private readonly CountdownEvent _suspendSignal1 = new CountdownEvent(1); + private readonly CountdownEvent _suspendSignal2 = new CountdownEvent(1); + private readonly ManualResetEventSlim _waitToCloseEvent = new ManualResetEventSlim(false); + + [Inject] + private CloseByBreakAndEnforceToStopTask( + [Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int enforceCloseTimeoutMilliseconds, + [Parameter(typeof(EnforceClose))] bool enforceClose) + { + _enforceClose = enforceClose; + _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds; + } + + public byte[] Call(byte[] memento) + { + int iterate = 1; + + while (Interlocked.Read(ref _shouldCloseTask) == 0 && iterate < 100) + { + iterate++; + if (_enforceClose) + { + _suspendSignal1.Wait(); + } + else + { + _suspendSignal2.Wait(); + } + } + + Interlocked.Exchange(ref _isTaskStopped, 1); + + if (Interlocked.Read(ref _shouldCloseTask) == 1) + { + Logger.Log(Level.Info, BreakTaskMessage); + _waitToCloseEvent.Set(); + } + + return null; + } + + public void Dispose() + { + Logger.Log(Level.Info, "Task is disposed."); + } + + /// <summary> + /// When the close event is received, it sets _shouldCloseTask to 1. + /// If _enforceClose is false, _suspendSignal2 is signaled to let the task to continue to run. This is to simulate that the + /// task is running properly and will break after completing the current iteration. It will set the _waitToCloseEvent + /// to let the flow in the close event handler to continue. + /// If _enforceClose is true, _suspendSignal1 will be not signaled, this is to simulate that the task is hung. + /// After waiting for specified time, the close handler will throw exception to enforce the task to stop. + /// </summary> + /// <param name="closeEvent"></param> + public void OnNext(ICloseEvent closeEvent) + { + if (closeEvent.Value.IsPresent() && Encoding.UTF8.GetString(closeEvent.Value.Value).Equals(DisposeMessageFromDriver)) + { + Logger.Log(Level.Info, "Closed event received in task:" + Encoding.UTF8.GetString(closeEvent.Value.Value)); + Interlocked.Exchange(ref _shouldCloseTask, 1); + if (!_enforceClose) + { + _suspendSignal2.Signal(); + } + + _waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds)); + + if (Interlocked.Read(ref _isTaskStopped) == 0) + { + Logger.Log(Level.Info, EnforceToCloseMessage); + throw new IMRUTaskSystemException(TaskManager.TaskKilledByDriver); + } + } + else + { + throw new Exception("Expected close event message is not received."); + } + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + } + + /// <summary> + /// This is a test task for the scenario in which the task receives close event, instead of + /// let the task to return properly, it throws exception. + /// </summary> + private sealed class CloseByThrowExceptionTask : ITask, IObserver<ICloseEvent> { private readonly CountdownEvent _suspendSignal = new CountdownEvent(1); [Inject] - private FailToCloseTask() + private CloseByThrowExceptionTask() { } @@ -370,10 +550,13 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge } } - private sealed class NoCloseHandlerTask : ITask + /// <summary> + /// This task doesn't implement close handler. It is to test closeHandlerNoBound exception. + /// </summary> + private sealed class MissingCloseHandlerTask : ITask { [Inject] - private NoCloseHandlerTask() + private MissingCloseHandlerTask() { } @@ -389,5 +572,10 @@ namespace Org.Apache.REEF.Tests.Functional.Bridge Logger.Log(Level.Info, "Task is disposed."); } } + + [NamedParameter("Enforce the task to close", "EnforceClose", "false")] + private sealed class EnforceClose : Name<bool> + { + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/414d4b45/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 3840a38..932d3f2 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -183,7 +183,7 @@ under the License. <Project>{6dc3b04e-2b99-4fda-bd23-2c7864f4c477}</Project> <Name>Org.Apache.REEF.IMRU.Examples</Name> </ProjectReference> - <ProjectReference Include="..\Org.Apache.REEF.IMRU\Org.Apache.REEF.IMRU.csproj"> + <ProjectReference Include="$(SolutionDir)\Org.Apache.REEF.IMRU\Org.Apache.REEF.IMRU.csproj"> <Project>{cc797c57-b465-4d11-98ac-edaaef5899a6}</Project> <Name>Org.Apache.REEF.IMRU</Name> </ProjectReference>
