Repository: reef
Updated Branches:
  refs/heads/master ffa976a70 -> be87fe58f


[REEF-1343] Fix events received in case of evaluator failure

This addressed the issue by
  * Invoke RuntimeStop on Exception in RuntimeClock.
  * Fix handling of Exceptions in EvaluatorRuntime.

JIRA:
  [REEF-1343](https://issues.apache.org/jira/browse/REEF-1343)

Pull Request:
  This closes #961


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/be87fe58
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/be87fe58
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/be87fe58

Branch: refs/heads/master
Commit: be87fe58f8cc80ad84cd4494812fcaa310f4a31e
Parents: ffa976a
Author: Andrew Chung <[email protected]>
Authored: Wed Apr 20 14:12:23 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Apr 25 14:06:37 2016 -0700

----------------------------------------------------------------------
 .../Runtime/Evaluator/EvaluatorRuntime.cs       | 54 +++++++++------
 .../Functional/FaultTolerant/PoisonTest.cs      | 70 +++++++++++++-------
 .../Time/Runtime/RuntimeClock.cs                | 45 ++++++++-----
 3 files changed, 111 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/be87fe58/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
index 45ff233..495b2c2 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorRuntime.cs
@@ -80,7 +80,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                 Logger.Log(Level.Info, "Handle Evaluator control message");
                 if (!message.identifier.Equals(_evaluatorId, 
StringComparison.OrdinalIgnoreCase))
                 {
-                    Handle(new InvalidOperationException(
+                    OnException(new InvalidOperationException(
                         string.Format(CultureInfo.InvariantCulture, 
"Identifier mismatch: message for evaluator id[{0}] sent to evaluator id[{1}]", 
message.identifier, _evaluatorId)));
                 }
                 else if (_state == State.DONE)
@@ -94,12 +94,12 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                     }
                     else
                     {
-                        Handle(new InvalidOperationException("Received a 
control message from Driver after Evaluator is done."));
+                        OnException(new InvalidOperationException("Received a 
control message from Driver after Evaluator is done."));
                     }
                 }
                 else if (_state != State.RUNNING)
                 {
-                    Handle(new InvalidOperationException(
+                    OnException(new InvalidOperationException(
                         string.Format(CultureInfo.InvariantCulture, "Evaluator 
received a control message but its state is not {0} but rather {1}", 
State.RUNNING, _state)));
                 }
                 else
@@ -120,7 +120,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                         catch (Exception e)
                         {
                             Utilities.Diagnostics.Exceptions.Caught(e, 
Level.Error, Logger);
-                            Handle(e);
+                            OnException(e);
                             Utilities.Diagnostics.Exceptions.Throw(new 
InvalidOperationException(e.ToString(), e), Logger);
                         }
                     }
@@ -164,7 +164,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
                 catch (Exception e)
                 {
                     Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, 
Logger);
-                    Handle(e);
+                    OnException(e);
                 }
             }
         }
@@ -172,22 +172,33 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
         public void OnNext(RuntimeStop runtimeStop)
         {
             Logger.Log(Level.Info, "Runtime stop");
-            _contextManager.Dispose();
 
             if (_state == State.RUNNING)
             {
-                _state = State.DONE;
-                _heartBeatManager.OnNext();
-            }
-            try
-            {
-                _evaluatorControlChannel.Dispose();
+                const string msg = "RuntimeStopHandler invoked in state 
RUNNING.";
+                if (runtimeStop.Exception != null)
+                {
+                    OnException(new SystemException(msg, 
runtimeStop.Exception));
+                }
+                else
+                {
+                    OnException(new SystemException(msg));
+                }
             }
-            catch (Exception e)
+            else
             {
-                Utilities.Diagnostics.Exceptions.CaughtAndThrow(new 
InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, 
"Exception during shut down.", Logger);
+                try
+                {
+                    _contextManager.Dispose();
+                    _evaluatorControlChannel.Dispose();
+                }
+                catch (Exception e)
+                {
+                    Utilities.Diagnostics.Exceptions.CaughtAndThrow(new 
InvalidOperationException("Cannot stop evaluator properly", e), Level.Error, 
"Exception during shut down.", Logger);
+                }
             }
-            Logger.Log(Level.Info, "EvaluatorRuntime shutdown complete");      
  
+
+            Logger.Log(Level.Info, "EvaluatorRuntime shutdown complete");      
         }
 
         public void OnNext(REEFMessage value)
@@ -199,27 +210,30 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator
             }
         }
 
-        private void Handle(Exception e)
+        private void OnException(Exception e)
         {
             lock (_heartBeatManager)
             {
-                Logger.Log(Level.Error, 
string.Format(CultureInfo.InvariantCulture, "evaluator {0} failed with 
exception", _evaluatorId), e);
+                Logger.Log(Level.Error, "Evaluator {0} failed with exception 
{1}.", _evaluatorId, e);
                 _state = State.FAILED;
-                string errorMessage = string.Format(
+
+                var errorMessage = string.Format(
                         CultureInfo.InvariantCulture,
                         "failed with error [{0}] with message [{1}] and stack 
trace [{2}]",
                         e,
                         e.Message,
                         e.StackTrace);
-                EvaluatorStatusProto evaluatorStatusProto = new 
EvaluatorStatusProto()
+
+                var evaluatorStatusProto = new EvaluatorStatusProto()
                 {
                     evaluator_id = _evaluatorId,
                     error = ByteUtilities.StringToByteArrays(errorMessage),
                     state = _state
                 };
+
                 _heartBeatManager.OnNext(evaluatorStatusProto);
                 _contextManager.Dispose();
-            }       
+            }
         }
 
         public void OnError(Exception error)

http://git-wip-us.apache.org/repos/asf/reef/blob/be87fe58/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs
index 7aab1ac..b0d1c40 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs
@@ -16,6 +16,7 @@
 // under the License.
 
 using System;
+using System.Linq;
 using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Context;
@@ -27,8 +28,8 @@ using Org.Apache.REEF.Utilities.Logging;
 using Xunit;
 using System.Threading;
 using Org.Apache.REEF.Common.Context;
-using Org.Apache.REEF.Common.Events;
 using Org.Apache.REEF.Common.Poison;
+using Org.Apache.REEF.Common.Tasks.Events;
 using Org.Apache.REEF.Driver.Task;
 using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
@@ -43,10 +44,11 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant
         private const string Prefix = "Poison: ";
         private const string FailedEvaluatorMessage = "I have succeeded in 
seeing a failed evaluator.";
         private const string TaskId = "1234567";
+        private const string ContextId = "ContextID";
 
         [Fact]
         [Trait("Description", "Test Poison functionality by injecting fault in 
context start handler.")]
-        public void TestPoisonedEvaluatorStartHanlder()
+        public void TestPoisonedEvaluatorStartHandler()
         {
             string testFolder = DefaultRuntimeFolder + TestId;
             TestRun(DriverConfigurations(), typeof(PoisonedEvaluatorDriver), 
1, "poisonedEvaluatorStartTest", "local", testFolder);
@@ -87,45 +89,67 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant
 
             public void OnNext(IAllocatedEvaluator value)
             {
-                var c1 = ContextConfiguration.ConfigurationModule
-                    .Set(ContextConfiguration.Identifier, "ContextID")
-                    .Set(ContextConfiguration.OnContextStart, 
GenericType<PoisonedEventHandler<IContextStart>>.Class)
-                    .Build();
-
-                var c2 = TangFactory.GetTang().NewConfigurationBuilder()
-                    .BindIntNamedParam<CrashTimeout>("500")
-                    .BindIntNamedParam<CrashMinDelay>("100")
-                    .BindNamedParameter<CrashProbability, 
double>(GenericType<CrashProbability>.Class, "1.0")
-                    .Build();
-
-                value.SubmitContext(Configurations.Merge(c1, c2));
+                value.SubmitContext(ContextConfiguration.ConfigurationModule
+                    .Set(ContextConfiguration.Identifier, ContextId)
+                    .Build());
             }
 
             public void OnNext(IActiveContext value)
             {
-                value.SubmitTask(TaskConfiguration.ConfigurationModule
+                var taskConfig = TaskConfiguration.ConfigurationModule
                     .Set(TaskConfiguration.Identifier, TaskId)
                     .Set(TaskConfiguration.Task, GenericType<SleepTask>.Class)
-                    .Build());
+                    .Set(TaskConfiguration.OnTaskStart, 
GenericType<PoisonedEventHandler<ITaskStart>>.Class)
+                    .Build();
+
+                var poisonConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
+                    .BindIntNamedParam<CrashTimeout>("500")
+                    .BindIntNamedParam<CrashMinDelay>("100")
+                    .BindNamedParameter<CrashProbability, 
double>(GenericType<CrashProbability>.Class, "1.0")
+                    .Build();
+                
+                value.SubmitTask(Configurations.Merge(taskConfig, 
poisonConfig));
             }
 
             public void OnNext(IFailedEvaluator value)
             {
                 Logger.Log(Level.Error, FailedEvaluatorMessage);
-                if (value.FailedTask.Value == null)
+                if (value.FailedTask.Value == null || 
!value.FailedTask.IsPresent())
+                {
+                    throw new Exception("No failed Task associated with failed 
Evaluator");
+                }
+
+                if (value.FailedTask.Value.Id != TaskId)
+                {
+                    throw new Exception("Failed Task ID returned " + 
value.FailedTask.Value.Id
+                        + ", was expecting Task ID " + TaskId);
+                }
+
+                Logger.Log(Level.Info, "Received all expected failed Tasks.");
+
+                const string expectedStr = "expected a single Context with 
Context ID " + ContextId + ".";
+
+                if (value.FailedContexts == null)
                 {
-                    // TODO[JIRA REEF-1343]: fail the test if there's no 
failed task
-                    Logger.Log(Level.Error, "No failed task associated with 
failed evaluator");
+                    throw new Exception("No Context was present but " + 
expectedStr);
                 }
-                else
+
+                if (value.FailedContexts.Count != 1)
                 {
-                    Logger.Log(Level.Error, "Failed task id '" + 
value.FailedTask.Value.Id + "'");
+                    throw new Exception("Collection of failed Contexts 
contains " + value.FailedContexts.Count + " failed Contexts but only " + 
expectedStr);
                 }
+                
+                if (!value.FailedContexts.Select(ctx => 
ctx.Id).Contains(ContextId))
+                {
+                    throw new Exception("Collection of failed Contexts does 
not contain expected Context ID " + ContextId + ".");
+                }
+
+                Logger.Log(Level.Info, "Received all expected failed 
Contexts.");
             }
+
             public void OnNext(ICompletedTask value)
             {
-                // TODO[JIRA REEF-1343]: fail the test if receive 
ICompletedTask after failed evaluator
-                Logger.Log(Level.Info, "ICompletedTask");
+                throw new Exception("A completed task was not expected.");
             }
 
             public void OnError(Exception error)

http://git-wip-us.apache.org/repos/asf/reef/blob/be87fe58/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs 
b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs
index 98be51b..8c14c51 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs
@@ -21,6 +21,7 @@ using System.Linq;
 using System.Threading;
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Implementations.InjectionPlan;
+using Org.Apache.REEF.Utilities;
 using Org.Apache.REEF.Utilities.Collections;
 using Org.Apache.REEF.Utilities.Diagnostics;
 using Org.Apache.REEF.Utilities.Logging;
@@ -151,29 +152,43 @@ namespace Org.Apache.REEF.Wake.Time.Runtime
         public void Run()
         {
             SubscribeHandlers();
-            _handlers.OnNext(new RuntimeStart(_timer.CurrentTime));
-            _handlers.OnNext(new StartTime(_timer.CurrentTime));
 
-            while (true)
+            var runtimeException = Optional<Exception>.Empty();
+            try
             {
-                lock (_schedule)
+                _handlers.OnNext(new RuntimeStart(_timer.CurrentTime));
+                _handlers.OnNext(new StartTime(_timer.CurrentTime));
+
+                while (true)
                 {
-                    if (IsIdle())
+                    lock (_schedule)
                     {
-                        _handlers.OnNext(new IdleClock(_timer.CurrentTime));
-                    }
-                    
-                    // Blocks and releases lock until it receives the next 
event
-                    Time alarm = GetNextEvent();
-                    ProcessEvent(alarm);
+                        if (IsIdle())
+                        {
+                            _handlers.OnNext(new 
IdleClock(_timer.CurrentTime));
+                        }
 
-                    if (alarm is StopTime)
-                    {
-                        break;
+                        // Blocks and releases lock until it receives the next 
event
+                        Time alarm = GetNextEvent();
+                        ProcessEvent(alarm);
+
+                        if (alarm is StopTime)
+                        {
+                            break;
+                        }
                     }
                 }
             }
-            _handlers.OnNext(new RuntimeStop(_timer.CurrentTime));
+            catch (Exception e)
+            {
+                runtimeException = Optional<Exception>.Of(new 
SystemException("Caught Exception in clock, failing the Evaluator.", e));
+            }
+
+            var runtimeStop = runtimeException.IsPresent()
+                ? new RuntimeStop(_timer.CurrentTime, runtimeException.Value)
+                : new RuntimeStop(_timer.CurrentTime);
+
+            _handlers.OnNext(runtimeStop);
         }
 
         /// <summary>

Reply via email to