Repository: reef
Updated Branches:
  refs/heads/master 07e654977 -> bcf7ff881


[REEF-1424] Validate Task StartHandler failure => FailedTask Event

This addressed the issue by
  * Concentrating Task Exception logic handling into TaskRuntime.cs instead of 
in TaskLifeCycle or TaskStatus.
  * Adding TaskStartExceptionTest.
  * Adding TaskStartHandlerException to indicate that an Exception happened in 
the TaskStartHandler.

JIRA:
  [REEF-1424](https://issues.apache.org/jira/browse/REEF-1424)

This closes #1043


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/bcf7ff88
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/bcf7ff88
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/bcf7ff88

Branch: refs/heads/master
Commit: bcf7ff8813f867eb67c1b713569f19e0939341e5
Parents: 07e6549
Author: Andrew Chung <[email protected]>
Authored: Fri Jun 10 11:25:52 2016 -0700
Committer: Julia Wang <[email protected]>
Committed: Thu Jun 16 19:48:11 2016 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Common.csproj               |   1 +
 .../Runtime/Evaluator/Task/TaskLifeCycle.cs     |  15 +-
 .../Runtime/Evaluator/Task/TaskRuntime.cs       |   4 +
 .../Evaluator/Task/TaskStartHandlerException.cs |  28 +++
 .../Runtime/Evaluator/Task/TaskStatus.cs        |  28 +--
 .../Failure/User/TaskStartExceptionTest.cs      | 207 +++++++++++++++++++
 .../Failure/User/TaskStopExceptionTest.cs       |   1 -
 .../Org.Apache.REEF.Tests.csproj                |   1 +
 8 files changed, 258 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj 
b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index 3058b91..3582f00 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -163,6 +163,7 @@ under the License.
     <Compile Include="Runtime\Evaluator\Task\TaskClientCodeException.cs" />
     <Compile Include="Runtime\Evaluator\Task\TaskLifeCycle.cs" />
     <Compile Include="Runtime\Evaluator\Task\TaskRuntime.cs" />
+    <Compile Include="Runtime\Evaluator\Task\TaskStartHandlerException.cs" />
     <Compile Include="Runtime\Evaluator\Task\TaskStartImpl.cs" />
     <Compile Include="Runtime\Evaluator\Task\TaskState.cs" />
     <Compile Include="Runtime\Evaluator\Task\TaskStatus.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs
index 687cb9c..9eb9ea5 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskLifeCycle.cs
@@ -51,13 +51,20 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
 
         public void Start() 
         {
-            if (Interlocked.Exchange(ref _startHasBeenInvoked, 1) == 0)
+            try
             {
-                foreach (var startHandler in _taskStartHandlers)
+                if (Interlocked.Exchange(ref _startHasBeenInvoked, 1) == 0)
                 {
-                    startHandler.OnNext(_taskStart);
+                    foreach (var startHandler in _taskStartHandlers)
+                    {
+                        startHandler.OnNext(_taskStart);
+                    }
                 }
             }
+            catch (Exception e)
+            {
+                throw new TaskStartHandlerException("Encountered Exception in 
TaskStartHandler.", e);
+            }
         }
 
         public void Stop() 
@@ -74,7 +81,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
             }
             catch (Exception e)
             {
-                throw new TaskStopHandlerException("Encountered Exception on 
TaskStopHandler.", e);
+                throw new TaskStopHandlerException("Encountered Exception in 
TaskStopHandler.", e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index 8a13d63..b14e3b9 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -110,6 +110,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                             "Task running result:\r\n" + 
System.Text.Encoding.Default.GetString(result));
                     }
                 }
+                catch (TaskStartHandlerException e)
+                {
+                    _currentStatus.SetException(e.InnerException);
+                }
                 catch (TaskStopHandlerException e)
                 {
                     _currentStatus.SetException(e.InnerException);

http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartHandlerException.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartHandlerException.cs
 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartHandlerException.cs
new file mode 100644
index 0000000..07e8b10
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStartHandlerException.cs
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+
+namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
+{
+    internal sealed class TaskStartHandlerException : Exception
+    {
+        internal TaskStartHandlerException(string message, Exception inner) : 
base(message, inner)
+        {
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
index 0347033..5e4ca2f 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskStatus.cs
@@ -143,17 +143,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                 LOGGER.Log(Level.Verbose, "TaskStatus::SetInit");
                 if (_state == TaskState.Init)
                 {
-                    try
-                    {
-                        _taskLifeCycle.Start();
-                        LOGGER.Log(Level.Info, "Sending task INIT heartbeat");
-                        Heartbeat();
-                    }
-                    catch (Exception e)
-                    {
-                        Utilities.Diagnostics.Exceptions.Caught(e, 
Level.Error, "Cannot set task status to INIT.", LOGGER);
-                        SetException(e);
-                    }
+                    LOGGER.Log(Level.Verbose, "Sending task INIT heartbeat");
+                    Heartbeat();
                 }
             }
         }
@@ -165,17 +156,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                 LOGGER.Log(Level.Verbose, "TaskStatus::SetRunning");
                 if (_state == TaskState.Init)
                 {
-                    try
-                    {
-                        State = TaskState.Running;
-                        LOGGER.Log(Level.Info, "Sending task Running 
heartbeat");
-                        Heartbeat();
-                    }
-                    catch (Exception e)
-                    {
-                        Utilities.Diagnostics.Exceptions.Caught(e, 
Level.Error, "Cannot set task status to running.", LOGGER);
-                        SetException(e);
-                    }
+                    _taskLifeCycle.Start();
+                    State = TaskState.Running;
+                    LOGGER.Log(Level.Verbose, "Sending task Running 
heartbeat");
+                    Heartbeat();
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStartExceptionTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStartExceptionTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStartExceptionTest.cs
new file mode 100644
index 0000000..a218aef
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStartExceptionTest.cs
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.Serialization;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Tests.Functional.Common.Task;
+using Org.Apache.REEF.Tests.Functional.Common.Task.Handlers;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Failure.User
+{
+    /// <summary>
+    /// This test class contains a test that validates that an Exception in 
+    /// a TaskStartHandler triggers an <see cref="IFailedTask"/> event.
+    /// </summary>
+    [Collection("FunctionalTests")]
+    public sealed class TaskStartExceptionTest : ReefFunctionalTest
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(TaskStartExceptionTest));
+
+        private const string TaskStartExceptionMessage = 
"TaskStartExceptionMessage";
+        private const string InitialTaskMessage = "InitialTaskMessage";
+        private const string ResubmitTaskMessage = "ResubmitTaskMessage";
+        private const string FailedTaskReceived = "FailedTaskReceived";
+        private const string CompletedTaskReceived = "CompletedTaskReceived";
+
+        /// <summary>
+        /// This test validates that an Exception in the TaskStartHandler 
causes a FailedTask
+        /// event in the Driver, and that a new Task can be submitted on the 
original Context.
+        /// </summary>
+        [Fact]
+        public void TestStopTaskWithExceptionOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, 
GenericType<TaskStartExceptionTestDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<TaskStartExceptionTestDriver>.Class)
+                .Set(DriverConfiguration.OnTaskCompleted, 
GenericType<TaskStartExceptionTestDriver>.Class)
+                .Set(DriverConfiguration.OnTaskFailed, 
GenericType<TaskStartExceptionTestDriver>.Class)
+                .Build(), typeof(TaskStartExceptionTestDriver), 1, 
"testStartTaskWithExceptionOnLocalRuntime", "local", testFolder);
+
+            var driverMessages = new List<string>
+            {
+                FailedTaskReceived,
+                CompletedTaskReceived
+            };
+
+            ValidateMessagesSuccessfullyLoggedForDriver(driverMessages, 
testFolder, 1);
+
+            // Validate that the first Task never starts.
+            ValidateMessageSuccessfullyLogged(
+                new List<string> { InitialTaskMessage }, "Node-*", 
EvaluatorStdout, testFolder, 0);
+
+            ValidateMessageSuccessfullyLogged(
+                new List<string> { ResubmitTaskMessage }, "Node-*", 
EvaluatorStdout, testFolder, 1);
+
+            CleanUp(testFolder);
+        }
+
+        private sealed class TaskStartExceptionTestDriver :
+            IObserver<IDriverStarted>,
+            IObserver<IAllocatedEvaluator>,
+            IObserver<ICompletedTask>,
+            IObserver<IFailedTask>
+        {
+            private readonly IEvaluatorRequestor _requestor;
+
+            [Inject]
+            private TaskStartExceptionTestDriver(IEvaluatorRequestor requestor)
+            {
+                _requestor = requestor;
+            }
+
+            public void OnNext(IDriverStarted value)
+            {
+                _requestor.Submit(_requestor.NewBuilder().Build());
+            }
+
+            public void OnNext(IAllocatedEvaluator value)
+            {
+                // submit the first Task.
+                value.SubmitTask(TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, "TaskID")
+                        .Set(TaskConfiguration.Task, 
GenericType<TaskStartExceptionTask>.Class)
+                        .Set(TaskConfiguration.OnTaskStart, 
GenericType<TaskStartHandlerWithException>.Class)
+                        .Build());
+            }
+
+            public void OnNext(ICompletedTask value)
+            {
+                // Should only receive one CompletedTask, as validated.
+                Logger.Log(Level.Info, CompletedTaskReceived);
+                value.ActiveContext.Dispose();
+            }
+
+            public void OnNext(IFailedTask value)
+            {
+                // Check that Exceptions are deserialized correctly.
+                var ex = value.AsError();
+                if (ex == null)
+                {
+                    throw new Exception("Exception was not expected to be 
null.");
+                }
+
+                var taskStartEx = ex as TaskStartExceptionTestException;
+
+                Assert.True(taskStartEx != null, "Expected Exception to be of 
type TaskStartExceptionTestException, but instead got type " + 
ex.GetType().Name);
+                
Assert.True(taskStartEx.Message.Equals(TaskStartExceptionMessage),
+                    "Expected message to be " + TaskStartExceptionMessage + " 
but instead got " + taskStartEx.Message + ".");
+
+                Logger.Log(Level.Info, FailedTaskReceived);
+
+                // Submit the new Task to verify that the original Context 
accepts new Tasks.
+                value.GetActiveContext().Value.SubmitTask(
+                    TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, "TaskID")
+                        .Set(TaskConfiguration.Task, 
GenericType<TaskStartExceptionResubmitTask>.Class)
+                        .Build());
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private sealed class TaskStartExceptionTask : LoggingTask
+        {
+            [Inject]
+            private TaskStartExceptionTask()
+                : base(InitialTaskMessage)
+            {
+            }
+        }
+
+        /// <summary>
+        /// A simple Task for Task resubmission validation on a Context with a 
previous Task
+        /// that failed on a Task StartHandler.
+        /// </summary>
+        private sealed class TaskStartExceptionResubmitTask : LoggingTask
+        {
+            [Inject]
+            private TaskStartExceptionResubmitTask()
+                : base(ResubmitTaskMessage)
+            {
+            }
+        }
+
+        /// <summary>
+        /// Throws a test Exception on Task Start to trigger a task failure.
+        /// </summary>
+        private sealed class TaskStartHandlerWithException : 
ExceptionThrowingHandler<ITaskStart>
+        {
+            [Inject]
+            private TaskStartHandlerWithException() :
+                base(new 
TaskStartExceptionTestException(TaskStartExceptionMessage))
+            {
+            }
+        }
+
+        /// <summary>
+        /// A Serializable Exception to verify that the Exception is 
deserialized correctly.
+        /// </summary>
+        [Serializable]
+        private sealed class TaskStartExceptionTestException : Exception
+        {
+            public TaskStartExceptionTestException(string message)
+                : base(message)
+            {
+            }
+
+            private TaskStartExceptionTestException(SerializationInfo info, 
StreamingContext context)
+                : base(info, context)
+            {
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs
index 84aba1f..da2ba9e 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskStopExceptionTest.cs
@@ -69,7 +69,6 @@ namespace Org.Apache.REEF.Tests.Functional.Failure.User
             };
 
             ValidateMessagesSuccessfullyLoggedForDriver(driverMessages, 
testFolder, 1);
-            ValidateMessageSuccessfullyLogged(driverMessages, "driver", 
DriverStdout, testFolder, 1);
 
             var evaluatorMessages = new List<string> { InitialTaskMessage, 
ResubmitTaskMessage };
             ValidateMessageSuccessfullyLogged(evaluatorMessages, "Node-*", 
EvaluatorStdout, testFolder, 1);

http://git-wip-us.apache.org/repos/asf/reef/blob/bcf7ff88/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 997d05d..6dde363 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -88,6 +88,7 @@ under the License.
     <Compile Include="Functional\Common\Task\Handlers\LoggingHandler.cs" />
     <Compile Include="Functional\Common\Task\LoggingTask.cs" />
     <Compile 
Include="Functional\Common\Task\Handlers\ExceptionThrowingHandler.cs" />
+    <Compile Include="Functional\Failure\User\TaskStartExceptionTest.cs" />
     <Compile 
Include="Functional\Failure\User\UnhandledThreadExceptionInTaskTest.cs" />
     <Compile Include="Functional\Driver\DriverTestStartHandler.cs" />
     <Compile 
Include="Functional\Failure\BasePoisonedEvaluatorWithActiveContextDriver.cs" />

Reply via email to