Repository: reef
Updated Branches:
  refs/heads/master 8ecb3e676 -> 5ed146d2d


[REEF-1374] Task failure heartbeat could be sent after Evaluator failure 
heartbeat

This addressed the issue by
  * Disabling Evaluator heartbeats after the Evaluator declares itself as DONE 
or KILLED or FAILED.

JIRA:
  [REEF-1374](https://issues.apache.org/jira/browse/REEF-1374)

This closes #1000


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/5ed146d2
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/5ed146d2
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/5ed146d2

Branch: refs/heads/master
Commit: 5ed146d2d45196f1cf9862570eb5716986417173
Parents: 8ecb3e6
Author: Andrew Chung <[email protected]>
Authored: Thu May 12 11:38:57 2016 -0700
Committer: Julia Wang <[email protected]>
Committed: Tue May 17 13:31:13 2016 -0700

----------------------------------------------------------------------
 .../Runtime/Evaluator/HeartBeatManager.cs       | 22 +++++++++++++++++++-
 .../Runtime/Evaluator/Task/TaskRuntime.cs       |  3 ---
 2 files changed, 21 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/5ed146d2/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
index 2c8064e..0f3fbd6 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs
@@ -69,6 +69,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
 
         private readonly IInjectionFuture<ContextManager> _contextManager;
 
+        private bool _isCompletedHeartbeatQueued = false;
+
         // the queue can only contains the following:
         // 1. all failed heartbeats (regular and event-based) before entering 
RECOVERY state
         // 2. event-based heartbeats generated in RECOVERY state (since there 
will be no attempt to send regular heartbeat)
@@ -126,6 +128,18 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
         {
             lock (_queuedHeartbeats)
             {
+                // Do not send a heartbeat if Evaluator has already signaled 
that it was done.
+                if (_isCompletedHeartbeatQueued)
+                {
+                    LOGGER.Log(Level.Warning, "Evaluator trying to schedule a 
heartbeat after a completed heartbeat has already been scheduled or sent.");
+                    return;
+                }
+
+                if 
(IsEvaluatorStateCompleted(evaluatorHeartbeatProto.evaluator_status.state))
+                {
+                    _isCompletedHeartbeatQueued = true;
+                }
+
                 if (_evaluatorSettings.OperationState == 
EvaluatorOperationState.RECOVERY)
                 {
                     LOGGER.Log(Level.Warning, 
string.Format(CultureInfo.InvariantCulture, "In RECOVERY mode, heartbeat queued 
as [{0}]. ", evaluatorHeartbeatProto));
@@ -261,7 +275,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                 {
                     LOGGER.Log(Level.Verbose, "Ignoring regular heartbeat 
since Evaluator operation state is [{0}] and runtime state is [{1}]. ", 
EvaluatorSettings.OperationState, EvaluatorRuntime.State);
 
-                    if (EvaluatorRuntime.State == State.DONE || 
EvaluatorRuntime.State == State.FAILED || EvaluatorRuntime.State == 
State.KILLED)
+                    // Do not try to recover if Evaluator is done.
+                    if (IsEvaluatorStateCompleted(EvaluatorRuntime.State))
                     {
                         return;
                     }
@@ -307,6 +322,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
             throw new NotImplementedException();
         }
 
+        private static bool IsEvaluatorStateCompleted(State state)
+        {
+            return state == State.DONE || state == State.FAILED || state == 
State.KILLED;
+        }
+
         private static long CurrentTimeMilliSeconds()
         {
             // this is an implmenation to get current time milli second 
counted from Jan 1st, 1970

http://git-wip-us.apache.org/repos/asf/reef/blob/5ed146d2/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index d56731c..cfc8c3b 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -17,7 +17,6 @@
 
 using System;
 using System.Globalization;
-using System.Linq;
 using System.Threading;
 using Org.Apache.REEF.Common.Protobuf.ReefProtocol;
 using Org.Apache.REEF.Common.Tasks;
@@ -220,8 +219,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         {
             Logger.Log(Level.Info, "TaskRuntime::OnNext(ICloseEvent value)");
             _closeHandlerFuture.Get().OnNext(value);
-
-            // TODO: send a heartbeat
         }
 
         public void OnNext(ISuspendEvent value)

Reply via email to