Repository: reef
Updated Branches:
  refs/heads/master b8d2bad86 -> 3c7aba279


[REEF-1725] Set job done as task result if UpdateTask is done in IMRU

Currently in IMRU fault tolerant system, when the master task is done,
and at the same time the task receives Close event caused by other
evaluator failure, the system state is changed to ShuttingDown,
causing the system to retry again.

This change makes update task to pass "job done" information
as task result back to driver. Driver will use this information
to determine if the job is done regardless of whether any failures
happen at the same time.

JIRA:
   [REEF-1725](https://issues.apache.org/jira/browse/REEF-1725)

Pull request:
  This closes #1245


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/3c7aba27
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/3c7aba27
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/3c7aba27

Branch: refs/heads/master
Commit: 3c7aba279288d1cbf4e9b72a9875e2839ede8a35
Parents: b8d2bad
Author: Julia Wang <[email protected]>
Authored: Fri Jan 27 15:31:39 2017 -0800
Committer: Mariia Mykhailova <[email protected]>
Committed: Thu Feb 2 14:53:52 2017 -0800

----------------------------------------------------------------------
 .../TestTaskManager.cs                          | 14 +++++++++++++-
 .../OnREEF/Driver/IMRUDriver.cs                 | 12 +++++++++++-
 .../OnREEF/Driver/TaskManager.cs                | 20 +++++++++++++++++---
 .../OnREEF/IMRUTasks/UpdateTaskHost.cs          | 11 +++++++++++
 4 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/3c7aba27/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
index d0cdda8..fa3567b 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestTaskManager.cs
@@ -205,7 +205,7 @@ namespace Org.Apache.REEF.IMRU.Tests
             
taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 
1));
             Assert.False(taskManager.IsMasterTaskCompletedRunning());
 
-            
taskManager.RecordCompletedTask(CreateMockCompletedTask(MasterTaskId));
+            
taskManager.RecordCompletedTask(CreateMockCompletedUpdateTask(MasterTaskId));
             Assert.True(taskManager.IsMasterTaskCompletedRunning());
 
             
taskManager.RecordCompletedTask(CreateMockCompletedTask(MapperTaskIdPrefix + 
2));
@@ -748,6 +748,18 @@ namespace Org.Apache.REEF.IMRU.Tests
         }
 
         /// <summary>
+        /// Creates a mock completed update task with the taskId specified
+        /// </summary>
+        /// <param name="taskId"></param>
+        /// <returns></returns>
+        private static ICompletedTask CreateMockCompletedUpdateTask(string 
taskId)
+        {
+            var completedTask = CreateMockCompletedTask(taskId);
+            
completedTask.Message.Returns(ByteUtilities.StringToByteArrays(TaskManager.UpdateTaskCompleted));
+            return completedTask;
+        }
+
+        /// <summary>
         /// Creates a mock IFailedEvaluator with the specified IFailedTask 
associated
         /// </summary>
         /// <param name="evaluatorId"></param>

http://git-wip-us.apache.org/repos/asf/reef/blob/3c7aba27/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 48256bd..d86e80f 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -479,6 +479,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         public void OnNext(ICompletedTask completedTask)
         {
             Logger.Log(Level.Info, "Received ICompletedTask {0}, with 
systemState {1} in retry# {2}.", completedTask.Id, _systemState.CurrentState, 
_numberOfRetries);
+            
             lock (_lock)
             {
                 if 
(_evaluatorsForceClosed.Contains(completedTask.ActiveContext.EvaluatorId))
@@ -501,7 +502,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                     case SystemState.ShuttingDown:
                         // The task might be in running state or waiting for 
close, record the completed task
                         _taskManager.RecordCompletedTask(completedTask);
-                        TryRecovery();
+                        if (_taskManager.IsJobDone())
+                        {
+                            
_systemState.MoveNext(SystemStateEvent.AllTasksAreCompleted);
+                            Logger.Log(Level.Info, "Master task is completed, 
systemState {0}", _systemState.CurrentState);
+                            DoneAction();
+                        }
+                        else
+                        {
+                            TryRecovery();
+                        }
                         break;
 
                     case SystemState.TasksCompleted:

http://git-wip-us.apache.org/repos/asf/reef/blob/3c7aba27/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
index 72e1d75..2eae234 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs
@@ -28,6 +28,7 @@ using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine;
 using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
 using Org.Apache.REEF.Tang.Exceptions;
 using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Utilities;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
@@ -61,6 +62,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         internal const string TaskKilledByDriver = "TaskKilledByDriver";
 
         /// <summary>
+        /// Result sent from UpdateTaskHost with the ICompletedTask message
+        /// </summary>
+        internal const string UpdateTaskCompleted = "UpdateTaskCompleted";
+
+        /// <summary>
         /// This Dictionary contains task information. The key is the Id of 
the Task, the value is TaskInfo which contains
         /// task state, task configuration, and active context that the task 
is running on. 
         /// </summary>
@@ -215,8 +221,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
 
         /// <summary>
         /// This method is called when receiving ICompletedTask event during 
task running or system shutting down.
-        /// If it is master task and if the master task was running, mark 
_masterTaskCompletedRunning true. That indicates 
-        /// master task has successfully completed, which means the system has 
got the result from master task. 
+        /// If it is master task, if the master task was running or if the 
task message indicates the task has been done,
+        /// mark _masterTaskCompletedRunning true. That indicates master task 
has successfully completed, which means the 
+        /// system has got the result from master task. 
         /// Removes the task from running tasks if it was running
         /// Changes the task state from RunningTask to CompletedTask if the 
task was running
         /// Change the task stat from TaskWaitingForClose to 
TaskClosedByDriver if the task was in TaskWaitingForClose state
@@ -225,7 +232,14 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         {
             if (completedTask.Id.Equals(_masterTaskId))
             {
-                if 
(GetTaskInfo(completedTask.Id).TaskState.CurrentState.Equals(TaskState.TaskRunning))
+                string message = "";
+                if (completedTask.Message != null)
+                {
+                    message = 
ByteUtilities.ByteArraysToString(completedTask.Message);
+                    Logger.Log(Level.Info, "UpdateTask message {0}", message);
+                }
+
+                if (message.Equals(TaskManager.UpdateTaskCompleted))
                 {
                     _masterTaskCompletedRunning = true;
                 }

http://git-wip-us.apache.org/repos/asf/reef/blob/3c7aba27/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
index 0afb8d3..6c32c68 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
@@ -25,6 +25,7 @@ using Org.Apache.REEF.IMRU.OnREEF.Parameters;
 using Org.Apache.REEF.Network.Group.Operators;
 using Org.Apache.REEF.Network.Group.Task;
 using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities;
 using Org.Apache.REEF.Utilities.Attributes;
 using Org.Apache.REEF.Utilities.Logging;
 
@@ -47,6 +48,11 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
         private readonly IIMRUResultHandler<TResult> _resultHandler;
 
         /// <summary>
+        /// It indicates if the update task has completed and result has been 
written.
+        /// </summary>
+        private bool _done;
+
+        /// <summary>
         /// </summary>
         /// <param name="updateTask">The UpdateTask hosted in this REEF 
Task.</param>
         /// <param name="groupCommunicationsClient">Used to setup the 
communications.</param>
@@ -114,6 +120,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
                     if (updateResult.HasResult)
                     {
                         _resultHandler.HandleResult(updateResult.Result);
+                        _done = true;
                     }
                 }
                 catch (Exception e)
@@ -129,6 +136,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks
                 _dataAndControlMessageSender.Send(stopMessage);
             }
 
+            if (_done)
+            {
+                return 
ByteUtilities.StringToByteArrays(TaskManager.UpdateTaskCompleted);
+            }
             return null;
         }
 

Reply via email to