Repository: reef Updated Branches: refs/heads/master 68e3cb0f8 -> 6cde8ce78
[REEF-1423] Tasks are not disposed after they are closed Updated TaskRunTime to ensure the Dispose is called after closing the task Refactored IMRU task close handler Added test cases. JIRA: [REEF-1423](https://issues.apache.org/jira/browse/REEF-1423) This closes #1032 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/6cde8ce7 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/6cde8ce7 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/6cde8ce7 Branch: refs/heads/master Commit: 6cde8ce7808c3ca99b466117d83c7d8eb9606aed Parents: 68e3cb0 Author: Julia Wang <[email protected]> Authored: Wed Jun 8 19:42:30 2016 -0700 Committer: Mariia Mykhailova <[email protected]> Committed: Fri Jun 17 17:52:50 2016 -0700 ---------------------------------------------------------------------- .../Runtime/Evaluator/Task/TaskRuntime.cs | 18 ++ lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs | 6 + .../OnREEF/IMRUTasks/MapTaskHost.cs | 73 ++--- .../OnREEF/IMRUTasks/TaskCloseCoordinator.cs | 115 +++++++ .../OnREEF/IMRUTasks/UpdateTaskHost.cs | 63 +--- .../Org.Apache.REEF.IMRU.csproj | 1 + .../Functional/Bridge/TestDisposeTasks.cs | 301 +++++++++++++++++++ .../Functional/IMRU/IMRUCloseTaskTest.cs | 30 +- .../Org.Apache.REEF.Tests.csproj | 1 + 9 files changed, 487 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index b14e3b9..7c94615 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -181,6 +181,24 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Close.", Logger); _currentStatus.SetException(e); } + finally + { + try + { + if (_userTask != null) + { + _userTask.Dispose(); + } + } + catch (Exception e) + { + Utilities.Diagnostics.Exceptions.CaughtAndThrow( + new InvalidOperationException("Cannot dispose task properly", e), + Level.Error, + "Exception during task dispose.", + Logger); + } + } } public void Suspend(byte[] message) http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs index cb796eb..c9d65e2 100644 --- a/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs +++ b/lang/cs/Org.Apache.REEF.Common/Tasks/ITask.cs @@ -19,6 +19,12 @@ using System; namespace Org.Apache.REEF.Common.Tasks { + /// <summary> + /// Task interface. Client should implement his own tasks. + /// Each Task should implement Dispose() to release resources if any. + /// Dispose will be called after the Call() is returned or task is closed. If there is any exception during the task dispose, + /// the error will be logged and the exception will be ignored as the task has been completed. + /// </summary> public interface ITask : IDisposable { byte[] Call(byte[] memento); http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs index 6f42cd5..86b1f7c 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs @@ -48,48 +48,31 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks private readonly bool _invokeGC; /// <summary> - /// When receiving a close event, this variable is set to 1. At the beginning of each task iteration, - /// if this variable is set to 1, the task will break from the loop and return from the Call() method. - /// </summary> - private long _shouldCloseTask = 0; - - /// <summary> - /// Before the task is returned, this variable is set to 1. - /// Close handler will check this variable to decide if it needs to throw an exception. - /// </summary> - private long _isTaskStopped = 0; - - /// <summary> /// Shows if the object has been disposed. /// </summary> private int _disposed = 0; /// <summary> - /// Waiting time for the task to close by itself - /// </summary> - private readonly int _enforceCloseTimeoutMilliseconds; - - /// <summary> - /// An event that will wait in close handler until it is either signaled from Call method or timeout. + /// Group Communication client for the task /// </summary> - private readonly ManualResetEventSlim _waitToCloseEvent = new ManualResetEventSlim(false); + private readonly IGroupCommClient _groupCommunicationsClient; /// <summary> - /// Group Communication client for the task + /// Task close Coordinator to handle the work when receiving task close event /// </summary> - private readonly IGroupCommClient _groupCommunicationsClient; + private readonly TaskCloseCoordinator _taskCloseCoordinator; /// <summary> /// </summary> /// <param name="mapTask">The MapTask hosted in this REEF Task.</param> /// <param name="groupCommunicationsClient">Used to setup the communications.</param> - /// <param name="enforceCloseTimeoutMilliseconds">Timeout to enforce the task to close if receiving task close event</param> + /// <param name="taskCloseCoordinator">Task close Coordinator</param> /// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param> [Inject] private MapTaskHost( IMapFunction<TMapInput, TMapOutput> mapTask, IGroupCommClient groupCommunicationsClient, - [Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int enforceCloseTimeoutMilliseconds, + TaskCloseCoordinator taskCloseCoordinator, [Parameter(typeof(InvokeGC))] bool invokeGC) { _mapTask = mapTask; @@ -99,7 +82,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks cg.GetBroadcastReceiver<MapInputWithControlMessage<TMapInput>>(IMRUConstants.BroadcastOperatorName); _dataReducer = cg.GetReduceSender<TMapOutput>(IMRUConstants.ReduceOperatorName); _invokeGC = invokeGC; - _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds; + _taskCloseCoordinator = taskCloseCoordinator; } /// <summary> @@ -109,7 +92,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// <returns></returns> public byte[] Call(byte[] memento) { - while (Interlocked.Read(ref _shouldCloseTask) == 0) + MapControlMessage controlMessage = MapControlMessage.AnotherRound; + + while (!_taskCloseCoordinator.ShouldCloseTask() && controlMessage != MapControlMessage.Stop) { if (_invokeGC) { @@ -118,56 +103,32 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks GC.WaitForPendingFinalizers(); } - TMapOutput result; - using ( MapInputWithControlMessage<TMapInput> mapInput = _dataAndMessageReceiver.Receive()) { - if (mapInput.ControlMessage == MapControlMessage.Stop) + controlMessage = mapInput.ControlMessage; + if (controlMessage != MapControlMessage.Stop) { - break; + _dataReducer.Send(_mapTask.Map(mapInput.Message)); } - result = _mapTask.Map(mapInput.Message); } - - _dataReducer.Send(result); } - Interlocked.Exchange(ref _isTaskStopped, 1); - - if (Interlocked.Read(ref _shouldCloseTask) == 1) - { - _waitToCloseEvent.Set(); - } + _taskCloseCoordinator.SignalTaskStopped(); return null; } /// <summary> - /// Task close handler. - /// If the closed event is sent from driver, set _shouldCloseTask to 1 so that to inform the Call() to stop at the end of the current iteration. - /// Then waiting for the signal from Call method. Either it is signaled or after _enforceCloseTimeoutMilliseconds, - /// checks if the task has been stopped. If not, throw IMRUTaskSystemException to enforce the task to stop. + /// Task close handler. Calls TaskCloseCoordinator to handle the event. /// </summary> /// <param name="closeEvent"></param> public void OnNext(ICloseEvent closeEvent) { - var msg = Encoding.UTF8.GetString(closeEvent.Value.Value); - if (closeEvent.Value.IsPresent() && msg.Equals(TaskManager.CloseTaskByDriver)) - { - Logger.Log(Level.Info, "The task received close event with message: {0}.", msg); - Interlocked.Exchange(ref _shouldCloseTask, 1); - - _waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds)); - - if (Interlocked.Read(ref _isTaskStopped) == 0) - { - throw new IMRUTaskSystemException(TaskManager.TaskKilledByDriver); - } - } + _taskCloseCoordinator.HandleEvent(closeEvent); } /// <summary> - /// Dispose function + /// Dispose function. Dispose IGroupCommunicationsClient. /// </summary> public void Dispose() { http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs new file mode 100644 index 0000000..f60271a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Text; +using System.Threading; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.IMRU.OnREEF.Driver; +using Org.Apache.REEF.IMRU.OnREEF.Parameters; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Attributes; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks +{ + /// <summary> + /// This class provides a method to handle Task close event. It is called from TaskCloseEventHandler. + /// It also wraps flags to represent if the task should be closed and if the task has been stopped + /// so that to provide a coordination between the task and the close handler. + /// </summary> + [ThreadSafe] + internal sealed class TaskCloseCoordinator + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TaskCloseCoordinator)); + + /// <summary> + /// When a close event is received, this variable is set to 1. At the beginning of each task iteration, + /// if this variable is set to 1, the task will break from the loop and return from the Call() method. + /// </summary> + private long _shouldCloseTask = 0; + + /// <summary> + /// Waiting time for the task to close by itself + /// </summary> + private readonly int _enforceCloseTimeoutMilliseconds; + + /// <summary> + /// An event that will wait in close handler until it is either signaled from Call method or timeout. + /// </summary> + private readonly ManualResetEventSlim _waitToCloseEvent = new ManualResetEventSlim(false); + + /// <summary> + /// Handle task close event and manage the states, wait/signal when closing the task + /// </summary> + /// <param name="enforceCloseTimeoutMilliseconds">Timeout in milliseconds to enforce the task to close if receiving task close event</param> + [Inject] + private TaskCloseCoordinator([Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int enforceCloseTimeoutMilliseconds) + { + _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds; + } + + /// <summary> + /// Handle Task close event. + /// Set _shouldCloseTask to 1 so that to inform the task to stop at the end of the current iteration. + /// Then waiting for the signal from Call method. Either it is signaled or after _enforceCloseTimeoutMilliseconds, + /// If the closed event is sent from driver, checks if the _waitToCloseEvent has been signaled. If not, throw + /// IMRUTaskSystemException to enforce the task to stop. + /// </summary> + /// <param name="closeEvent"></param> + internal void HandleEvent(ICloseEvent closeEvent) + { + Interlocked.Exchange(ref _shouldCloseTask, 1); + var taskSignaled = _waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds)); + + if (closeEvent.Value.IsPresent()) + { + var msg = Encoding.UTF8.GetString(closeEvent.Value.Value); + if (msg.Equals(TaskManager.CloseTaskByDriver)) + { + Logger.Log(Level.Info, "The task received close event with message: {0}.", msg); + + if (!taskSignaled) + { + throw new IMRUTaskSystemException(TaskManager.TaskKilledByDriver); + } + } + } + else + { + Logger.Log(Level.Warning, "The task received close event with no message."); + } + } + + /// <summary> + /// Indicates if the task should be stopped. + /// </summary> + /// <returns></returns> + internal bool ShouldCloseTask() + { + return Interlocked.Read(ref _shouldCloseTask) == 1; + } + + /// <summary> + /// Called from Task right before the task is returned to signals _waitToCloseEvent. + /// </summary> + internal void SignalTaskStopped() + { + _waitToCloseEvent.Set(); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs index cfe121d..116bc63 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs @@ -50,50 +50,33 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks private readonly IIMRUResultHandler<TResult> _resultHandler; /// <summary> - /// When receiving a close event, this variable is set to 1. At the beginning of each task iteration, - /// if this variable is set to 1, the task will break from the loop and return from the Call() method. - /// </summary> - private long _shouldCloseTask = 0; - - /// <summary> - /// Before the task is returned, this variable is set to 1. - /// Close handler will check this variable to decide if it needs to throw an exception. - /// </summary> - private long _isTaskStopped = 0; - - /// <summary> /// Shows if the object has been disposed. /// </summary> private int _disposed = 0; /// <summary> - /// Waiting time for the task to close by itself - /// </summary> - private readonly int _enforceCloseTimeoutMilliseconds; - - /// <summary> - /// An event that will wait in close handler until it is either signaled from Call method or timeout. + /// Group Communication client for the task /// </summary> - private readonly ManualResetEventSlim _waitToCloseEvent = new ManualResetEventSlim(false); + private readonly IGroupCommClient _groupCommunicationsClient; /// <summary> - /// Group Communication client for the task + /// Task close Coordinator to handle the work when receiving task close event /// </summary> - private readonly IGroupCommClient _groupCommunicationsClient; + private readonly TaskCloseCoordinator _taskCloseCoordinator; /// <summary> /// </summary> /// <param name="updateTask">The UpdateTask hosted in this REEF Task.</param> /// <param name="groupCommunicationsClient">Used to setup the communications.</param> /// <param name="resultHandler">Result handler</param> - /// <param name="enforceCloseTimeoutMilliseconds">Timeout in milliseconds to enforce the task to close if receiving task close event</param> + /// <param name="taskCloseCoordinator">Task close Coordinator</param> /// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param> [Inject] private UpdateTaskHost( IUpdateFunction<TMapInput, TMapOutput, TResult> updateTask, IGroupCommClient groupCommunicationsClient, IIMRUResultHandler<TResult> resultHandler, - [Parameter(typeof(EnforceCloseTimeoutMilliseconds))] int enforceCloseTimeoutMilliseconds, + TaskCloseCoordinator taskCloseCoordinator, [Parameter(typeof(InvokeGC))] bool invokeGC) { _updateTask = updateTask; @@ -104,7 +87,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks _dataReceiver = cg.GetReduceReceiver<TMapOutput>(IMRUConstants.ReduceOperatorName); _invokeGC = invokeGC; _resultHandler = resultHandler; - _enforceCloseTimeoutMilliseconds = enforceCloseTimeoutMilliseconds; + _taskCloseCoordinator = taskCloseCoordinator; } /// <summary> @@ -117,7 +100,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks var updateResult = _updateTask.Initialize(); int iterNo = 0; - while (updateResult.HasMapInput && Interlocked.Read(ref _shouldCloseTask) == 0) + while (updateResult.HasMapInput && !_taskCloseCoordinator.ShouldCloseTask()) { iterNo++; @@ -145,7 +128,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks } } - if (Interlocked.Read(ref _shouldCloseTask) == 0) + if (!_taskCloseCoordinator.ShouldCloseTask()) { MapInputWithControlMessage<TMapInput> stopMessage = new MapInputWithControlMessage<TMapInput>(MapControlMessage.Stop); @@ -153,41 +136,21 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks } _resultHandler.Dispose(); - Interlocked.Exchange(ref _isTaskStopped, 1); - - if (Interlocked.Read(ref _shouldCloseTask) == 1) - { - _waitToCloseEvent.Set(); - } + _taskCloseCoordinator.SignalTaskStopped(); return null; } /// <summary> - /// Task close handler. - /// If the closed event is sent from driver, set _shouldCloseTask to 1 so that to inform the Call() to stop at the end of the current iteration. - /// Then waiting for the signal from Call method. Either it is signaled or after _enforceCloseTimeoutMilliseconds, - /// checks if the task has been stopped. If not, throw IMRUTaskSystemException to enforce the task to stop. + /// Task close handler. Call TaskCloseCoordinator to handle the event. /// </summary> /// <param name="closeEvent"></param> public void OnNext(ICloseEvent closeEvent) { - var msg = Encoding.UTF8.GetString(closeEvent.Value.Value); - if (closeEvent.Value.IsPresent() && msg.Equals(TaskManager.CloseTaskByDriver)) - { - Logger.Log(Level.Info, "The task received close event with message: {0}.", msg); - Interlocked.Exchange(ref _shouldCloseTask, 1); - - _waitToCloseEvent.Wait(TimeSpan.FromMilliseconds(_enforceCloseTimeoutMilliseconds)); - - if (Interlocked.Read(ref _isTaskStopped) == 0) - { - throw new IMRUTaskSystemException(TaskManager.TaskKilledByDriver); - } - } + _taskCloseCoordinator.HandleEvent(closeEvent); } /// <summary> - /// Dispose function + /// Dispose function. Dispose IGroupCommunicationsClient. /// </summary> public void Dispose() { http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index 3f62b1f..cdf87cc 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -94,6 +94,7 @@ under the License. <Compile Include="OnREEF\IMRUTasks\IMRUTaskGroupCommunicationException.cs" /> <Compile Include="OnREEF\IMRUTasks\IMRUTaskSystemException.cs" /> <Compile Include="OnREEF\IMRUTasks\MapTaskHost.cs" /> + <Compile Include="OnREEF\IMRUTasks\TaskCloseCoordinator.cs" /> <Compile Include="OnREEF\IMRUTasks\UpdateTaskHost.cs" /> <Compile Include="OnREEF\MapInputWithControlMessage\MapControlMessage.cs" /> <Compile Include="OnREEF\MapInputWithControlMessage\MapInputWithControlMessage.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs new file mode 100644 index 0000000..6a5eb6a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestDisposeTasks.cs @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Runtime.Evaluator.Task; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Bridge +{ + public class TestDisposeTasks : ReefFunctionalTest + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TestDisposeTasks)); + + private const string ExitByReturn = "ExitByReturn"; + private const string ExitByException = "ExitByException"; + private const string TaskIsDisposed = "TaskIsDisposed"; + private const string TaskKilledByDriver = "TaskKilledByDriver"; + private const string TaskId = "TaskId"; + private const string ContextId = "ContextId"; + + /// <summary> + /// Test scenario: Task returned properly then disposed + /// </summary> + [Fact] + public void TestDisposeInTaskNormalReturnOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurations(1), typeof(TestDisposeTasks), 1, "TestDisposeTasks", "local", testFolder); + ValidateSuccessForLocalRuntime(1, 0, 0, testFolder); + var messages = new List<string> { TaskIsDisposed }; + ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1); + CleanUp(testFolder); + } + + /// <summary> + /// Test scenario: Task is enforced to close after receiving close event + /// </summary> + [Fact] + public void TestDisposeInTaskExceptionOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurations(2), typeof(TestDisposeTasks), 1, "TestDisposeTasks", "local", testFolder); + ValidateSuccessForLocalRuntime(1, 1, 0, testFolder); + var messages = new List<string> { TaskIsDisposed }; + ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1); + CleanUp(testFolder); + } + + /// <summary> + /// Test scenario: Dispose context while the task is still running. + /// </summary> + [Fact] + public void TestDisposeFromContextInRunningOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurations(3), typeof(TestDisposeTasks), 1, "TestDisposeTasks", "local", testFolder); + ValidateSuccessForLocalRuntime(1, 0, 0, testFolder); + var messages = new List<string>(); + messages.Add(TaskIsDisposed); + ValidateMessageSuccessfullyLogged(messages, "Node-*", EvaluatorStdout, testFolder, 1); + CleanUp(testFolder); + } + + /// <summary> + /// Driver configuration for the test driver + /// </summary> + /// <returns></returns> + public IConfiguration DriverConfigurations(int taskNumber) + { + var taskIdConfig = TangFactory.GetTang() + .NewConfigurationBuilder() + .BindStringNamedParam<TaskNumber>(taskNumber.ToString()) + .Build(); + + var driverConfig = DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<DisposeTaskTestDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<DisposeTaskTestDriver>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<DisposeTaskTestDriver>.Class) + .Set(DriverConfiguration.OnTaskRunning, GenericType<DisposeTaskTestDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<DisposeTaskTestDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<DisposeTaskTestDriver>.Class) + .Build(); + + return Configurations.Merge(taskIdConfig, driverConfig); + } + + /// <summary> + /// Test driver + /// </summary> + private sealed class DisposeTaskTestDriver : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IActiveContext>, + IObserver<ICompletedTask>, + IObserver<IFailedTask>, + IObserver<IRunningTask> + { + private readonly IEvaluatorRequestor _requestor; + private readonly string _taskNumber; + + [Inject] + private DisposeTaskTestDriver(IEvaluatorRequestor evaluatorRequestor, + [Parameter(typeof(TaskNumber))] string taskNumber) + { + _requestor = evaluatorRequestor; + _taskNumber = taskNumber; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().SetNumber(1).Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + value.SubmitContext( + ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, ContextId) + .Build()); + } + + public void OnNext(IActiveContext value) + { + value.SubmitTask(GetTaskConfigurationForCloseTask(TaskId + _taskNumber)); + } + + public void OnNext(ICompletedTask value) + { + Logger.Log(Level.Info, "Task completed: " + value.Id); + Assert.Equal(TaskId + "1", value.Id); + value.ActiveContext.Dispose(); + } + + /// <summary> + /// Verify when exception is shown in task, IFailedTask will be received here with the message set in the task + /// And verify the context associated with the failed task is the same as the context that the task was submitted + /// </summary> + /// <param name="value"></param> + public void OnNext(IFailedTask value) + { + Assert.Equal(TaskId + "2", value.Id); + + var failedException = ByteUtilities.ByteArraysToString(value.Data.Value); + var e = value.AsError(); + Logger.Log(Level.Error, "In IFailedTask: e.type: {0}, e.message: {1}.", e.GetType(), e.Message); + Logger.Log(Level.Error, "In IFailedTask: value.Data.Value: {0}, value.Message {1}.", failedException, value.Message); + + Assert.Equal(typeof(Exception), e.GetType()); + Assert.Equal(TaskKilledByDriver, e.Message); + Assert.Contains(TaskKilledByDriver, failedException); + + value.GetActiveContext().Value.Dispose(); + } + + /// <summary> + /// Task1: Close task and expect it to return from Call() + /// Task2: Close the task and expect it throw exception + /// Task3: Let context Dispose to close a running task and make sure the task is disposed + /// </summary> + /// <param name="value"></param> + public void OnNext(IRunningTask value) + { + Logger.Log(Level.Info, "Task running: " + value.Id); + switch (value.Id) + { + case TaskId + "1": + value.Dispose(Encoding.UTF8.GetBytes(ExitByReturn)); + break; + case TaskId + "2": + value.Dispose(Encoding.UTF8.GetBytes(ExitByException)); + break; + case TaskId + "3": + value.ActiveContext.Dispose(); + break; + } + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + private static IConfiguration GetTaskConfigurationForCloseTask(string taskId) + { + return TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, taskId) + .Set(TaskConfiguration.Task, GenericType<CloseAndDisposeTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<CloseAndDisposeTask>.Class) + .Build(); + } + } + + private sealed class CloseAndDisposeTask : ITask, IObserver<ICloseEvent> + { + private readonly CountdownEvent _suspendSignal = new CountdownEvent(1); + private int _disposed = 0; + + [Inject] + private CloseAndDisposeTask() + { + } + + public byte[] Call(byte[] memento) + { + Logger.Log(Level.Info, "Hello in CloseAndDisposeTask"); + _suspendSignal.Wait(); + return null; + } + + public void Dispose() + { + if (Interlocked.Exchange(ref _disposed, 1) == 0) + { + Logger.Log(Level.Info, TaskIsDisposed); + } + } + + /// <summary> + /// Case 1: if message is ExitByReturn, signal the Call() to make the task return. + /// Case 2: if message is ExitByException, throw exception to expect the driver to receive FailedTask. + /// Otherwise do nothing, expecting TaskRuntime to dispose the task. + /// </summary> + /// <param name="closeEvent"></param> + public void OnNext(ICloseEvent closeEvent) + { + if (closeEvent.Value != null) + { + if (closeEvent.Value.IsPresent()) + { + string msg = Encoding.UTF8.GetString(closeEvent.Value.Value); + Logger.Log(Level.Info, "Closed event received in task:" + msg); + + if (msg.Equals(ExitByReturn)) + { + _suspendSignal.Signal(); + } + else if (msg.Equals(ExitByException)) + { + throw new Exception(TaskKilledByDriver); + } + } + else + { + Logger.Log(Level.Info, "Closed event received in task with no message"); + } + } + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + } + + [NamedParameter] + private class TaskNumber : Name<string> + { + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs index 2c766f2..c3521cd 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/IMRUCloseTaskTest.cs @@ -52,17 +52,17 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU [Fact] public void TestTaskCloseOnLocalRuntime() { - int chunkSize = 2; - int dims = 50; - int iterations = 200; - int mapperMemory = 5120; - int updateTaskMemory = 5120; - int numTasks = 4; - string testFolder = DefaultRuntimeFolder + TestId; + const int chunkSize = 2; + const int dims = 50; + const int iterations = 200; + const int mapperMemory = 5120; + const int updateTaskMemory = 5120; + const int numTasks = 4; + var testFolder = DefaultRuntimeFolder + TestId; TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, testFolder); string[] lines = ReadLogFile(DriverStdout, "driver", testFolder); - int failedCount = GetMessageCount(lines, FailTaskMessage); - int completedCount = GetMessageCount(lines, CompletedTaskMessage); + var failedCount = GetMessageCount(lines, FailTaskMessage); + var completedCount = GetMessageCount(lines, CompletedTaskMessage); Assert.Equal(numTasks, failedCount + completedCount); CleanUp(testFolder); } @@ -80,12 +80,12 @@ namespace Org.Apache.REEF.Tests.Functional.IMRU [Fact(Skip = "Requires Yarn")] public void TestTaskCloseOnLocalRuntimeOnYarn() { - int chunkSize = 2; - int dims = 50; - int iterations = 200; - int mapperMemory = 5120; - int updateTaskMemory = 5120; - int numTasks = 4; + const int chunkSize = 2; + const int dims = 50; + const int iterations = 200; + const int mapperMemory = 5120; + const int updateTaskMemory = 5120; + const int numTasks = 4; TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory); } http://git-wip-us.apache.org/repos/asf/reef/blob/6cde8ce7/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 2e83ab6..e58f980 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -129,6 +129,7 @@ under the License. <Compile Include="Functional\ReefFunctionalTest.cs" /> <Compile Include="Functional\RuntimeName\RuntimeNameTask.cs" /> <Compile Include="Functional\RuntimeName\RuntimeNameTest.cs" /> + <Compile Include="Functional\Bridge\TestDisposeTasks.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Utility\TestDriverConfigGenerator.cs" /> <Compile Include="Utility\TestExceptions.cs" />
