Repository: reef Updated Branches: refs/heads/master b8d2bad86 -> 3c7aba279
[REEF-1725] Set job done as task result if UpdateTask is done in IMRU Currently in IMRU fault tolerant system, when the master task is done, and at the same time the task receives Close event caused by other evaluator failure, the system state is changed to ShuttingDown, causing the system to retry again. This change makes update task to pass "job done" information as task result back to driver. Driver will use this information to determine if the job is done regardless of whether any failures happen at the same time. JIRA: [REEF-1725](https://issues.apache.org/jira/browse/REEF-1725) Pull request: This closes #1245 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/3c7aba27 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/3c7aba27 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/3c7aba27 Branch: refs/heads/master Commit: 3c7aba279288d1cbf4e9b72a9875e2839ede8a35 Parents: b8d2bad Author: Julia Wang <[email protected]> Authored: Fri Jan 27 15:31:39 2017 -0800 Committer: Mariia Mykhailova <[email protected]> Committed: Thu Feb 2 14:53:52 2017 -0800 ---------------------------------------------------------------------- .../TestTaskManager.cs | 14 +++++++++++++- .../OnREEF/Driver/IMRUDriver.cs | 12 +++++++++++- .../OnREEF/Driver/TaskManager.cs | 20 +++++++++++++++++--- .../OnREEF/IMRUTasks/UpdateTaskHost.cs | 11 +++++++++++ 4 files changed, 52 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/3c7aba27/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs index d0cdda8..fa3567b 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs @@ -205,7 +205,7 @@ namespace Org.Apache.REEF.IMRU.Tests taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 1)); Assert.False(taskManager.IsMasterTaskCompletedRunning()); - taskManager.RecordCompletedTask(CreateMockCompletedTask(MasterTaskId)); + taskManager.RecordCompletedTask(CreateMockCompletedUpdateTask(MasterTaskId)); Assert.True(taskManager.IsMasterTaskCompletedRunning()); taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 2)); @@ -748,6 +748,18 @@ namespace Org.Apache.REEF.IMRU.Tests } /// <summary> + /// Creates a mock completed update task with the taskId specified + /// </summary> + /// <param name="taskId"></param> + /// <returns></returns> + private static ICompletedTask CreateMockCompletedUpdateTask(string taskId) + { + var completedTask = CreateMockCompletedTask(taskId); + completedTask.Message.Returns(ByteUtilities.StringToByteArrays(TaskManager.UpdateTaskCompleted)); + return completedTask; + } + + /// <summary> /// Creates a mock IFailedEvaluator with the specified IFailedTask associated /// </summary> /// <param name="evaluatorId"></param> http://git-wip-us.apache.org/repos/asf/reef/blob/3c7aba27/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs index 48256bd..d86e80f 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs @@ -479,6 +479,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver public void OnNext(ICompletedTask completedTask) { Logger.Log(Level.Info, "Received ICompletedTask {0}, with systemState {1} in retry# {2}.", completedTask.Id, _systemState.CurrentState, _numberOfRetries); + lock (_lock) { if (_evaluatorsForceClosed.Contains(completedTask.ActiveContext.EvaluatorId)) @@ -501,7 +502,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver case SystemState.ShuttingDown: // The task might be in running state or waiting for close, record the completed task _taskManager.RecordCompletedTask(completedTask); - TryRecovery(); + if (_taskManager.IsJobDone()) + { + _systemState.MoveNext(SystemStateEvent.AllTasksAreCompleted); + Logger.Log(Level.Info, "Master task is completed, systemState {0}", _systemState.CurrentState); + DoneAction(); + } + else + { + TryRecovery(); + } break; case SystemState.TasksCompleted: http://git-wip-us.apache.org/repos/asf/reef/blob/3c7aba27/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs index 72e1d75..2eae234 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs @@ -28,6 +28,7 @@ using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine; using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; @@ -61,6 +62,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver internal const string TaskKilledByDriver = "TaskKilledByDriver"; /// <summary> + /// Result sent from UpdateTaskHost with the ICompletedTask message + /// </summary> + internal const string UpdateTaskCompleted = "UpdateTaskCompleted"; + + /// <summary> /// This Dictionary contains task information. The key is the Id of the Task, the value is TaskInfo which contains /// task state, task configuration, and active context that the task is running on. /// </summary> @@ -215,8 +221,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <summary> /// This method is called when receiving ICompletedTask event during task running or system shutting down. - /// If it is master task and if the master task was running, mark _masterTaskCompletedRunning true. That indicates - /// master task has successfully completed, which means the system has got the result from master task. + /// If it is master task, if the master task was running or if the task message indicates the task has been done, + /// mark _masterTaskCompletedRunning true. That indicates master task has successfully completed, which means the + /// system has got the result from master task. /// Removes the task from running tasks if it was running /// Changes the task state from RunningTask to CompletedTask if the task was running /// Change the task stat from TaskWaitingForClose to TaskClosedByDriver if the task was in TaskWaitingForClose state @@ -225,7 +232,14 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver { if (completedTask.Id.Equals(_masterTaskId)) { - if (GetTaskInfo(completedTask.Id).TaskState.CurrentState.Equals(TaskState.TaskRunning)) + string message = ""; + if (completedTask.Message != null) + { + message = ByteUtilities.ByteArraysToString(completedTask.Message); + Logger.Log(Level.Info, "UpdateTask message {0}", message); + } + + if (message.Equals(TaskManager.UpdateTaskCompleted)) { _masterTaskCompletedRunning = true; } http://git-wip-us.apache.org/repos/asf/reef/blob/3c7aba27/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs index 0afb8d3..6c32c68 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs @@ -25,6 +25,7 @@ using Org.Apache.REEF.IMRU.OnREEF.Parameters; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; @@ -47,6 +48,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks private readonly IIMRUResultHandler<TResult> _resultHandler; /// <summary> + /// It indicates if the update task has completed and result has been written. + /// </summary> + private bool _done; + + /// <summary> /// </summary> /// <param name="updateTask">The UpdateTask hosted in this REEF Task.</param> /// <param name="groupCommunicationsClient">Used to setup the communications.</param> @@ -114,6 +120,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks if (updateResult.HasResult) { _resultHandler.HandleResult(updateResult.Result); + _done = true; } } catch (Exception e) @@ -129,6 +136,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks _dataAndControlMessageSender.Send(stopMessage); } + if (_done) + { + return ByteUtilities.StringToByteArrays(TaskManager.UpdateTaskCompleted); + } return null; }
