Repository: reef
Updated Branches:
  refs/heads/master df1a226d6 -> 9ee932462


[REEF-1429] Validate Task Suspend failure => FailedEvaluator Event

This addressed the issue by
  * Adding a test to validate that a failure in task suspension triggers a 
FailedEvaluator.

JIRA:
  [REEF-1429](https://issues.apache.org/jira/browse/REEF-1429)

This closes #1066


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/9ee93246
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/9ee93246
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/9ee93246

Branch: refs/heads/master
Commit: 9ee932462ae4946514e2c4f601e3f6528ca3af13
Parents: df1a226
Author: Andrew Chung <[email protected]>
Authored: Fri Jun 17 13:52:32 2016 -0700
Committer: Julia Wang <[email protected]>
Committed: Fri Jul 1 17:51:17 2016 -0700

----------------------------------------------------------------------
 lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp  |  19 +--
 lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h    |   4 -
 .../Org.Apache.REEF.Common.csproj               |   1 -
 .../Evaluator/Task/TaskClientCodeException.cs   |  18 +--
 .../Runtime/Evaluator/Task/TaskRuntime.cs       |  25 +--
 .../Exceptions/TaskSuspendHandlerException.cs   |  43 -----
 .../Failure/User/TaskSuspendExceptionTest.cs    | 159 +++++++++++++++++++
 .../Org.Apache.REEF.Tests.csproj                |   3 +-
 8 files changed, 174 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp 
b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
index 66e1a01..69c28bc 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
@@ -109,19 +109,14 @@ array<byte>^ ManagedByteArrayFromJavaByteArray(
 jbyteArray JavaByteArrayFromManagedByteArray(
   JNIEnv *env,
   array<byte>^ managedByteArray) {
-  jbyteArray javaByteArray = env->NewByteArray(managedByteArray->Length);
-  pin_ptr<Byte> p = &managedByteArray[0];
-  env->SetByteArrayRegion(javaByteArray, 0, managedByteArray->Length, (jbyte*) 
p);
-  return javaByteArray;
-}
+  if (managedByteArray != nullptr) {
+    jbyteArray javaByteArray = env->NewByteArray(managedByteArray->Length);
+    pin_ptr<Byte> p = &managedByteArray[0];
+    env->SetByteArrayRegion(javaByteArray, 0, managedByteArray->Length, 
(jbyte*)p);
+    return javaByteArray;
+  }
 
-jlongArray JavaLongArrayFromManagedLongArray(
-  JNIEnv *env,
-  array<unsigned long long>^ managedLongArray) {
-  jlongArray javaLongArray = env->NewLongArray(managedLongArray->Length);
-  pin_ptr<unsigned long long> p = &managedLongArray[0];
-  env->SetLongArrayRegion(javaLongArray, 0, managedLongArray->Length, (jlong*) 
p);
-  return javaLongArray;
+  return NULL;
 }
 
 JNIEnv* RetrieveEnv(JavaVM* jvm) {

http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h 
b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h
index 7d3baa5..aa50ddd 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h
@@ -56,10 +56,6 @@ jbyteArray JavaByteArrayFromManagedByteArray(
   JNIEnv *env,
   array<byte>^ managedByteArray);
 
-jlongArray JavaLongArrayFromManagedLongArray(
-  JNIEnv *env,
-  array<unsigned long long>^ managedLongArray);
-
 JNIEnv* RetrieveEnv(JavaVM* jvm);
 
 String^ FormatJavaExceptionMessage(String^ errorMessage, Exception^ exception);

http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj 
b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index d438ad1..29955d8 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -188,7 +188,6 @@ under the License.
     <Compile Include="Tasks\Events\ITaskStart.cs" />
     <Compile Include="Tasks\Events\ITaskStop.cs" />
     <Compile Include="Tasks\Exceptions\TaskCloseHandlerNotBoundException.cs" />
-    <Compile Include="Tasks\Exceptions\TaskSuspendHandlerException.cs" />
     <Compile Include="Tasks\IDriverConnectionMessageHandler.cs" />
     <Compile Include="Tasks\IDriverMessageHandler.cs" />
     <Compile Include="Tasks\IDriverConnectionMessage.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs
 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs
index 47ceb8d..2b12c7c 100644
--- 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs
+++ 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs
@@ -23,7 +23,7 @@ using Org.Apache.REEF.Common.Exceptions;
 namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
 {
     /// <summary>
-    /// An Exception thrown when Task operations (Start, Stop, Suspend) fail.
+    /// An Exception thrown when Task creation fails.
     /// </summary>
     [Serializable]
     public sealed class TaskClientCodeException : Exception
@@ -74,22 +74,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         }
 
         /// <summary>
-        /// construct the exception that caused the Task to fail
-        /// </summary>
-        /// <param name="taskId"> the id of the failed task.</param>
-        /// <param name="contextId"> the id of the context the failed Task was 
executing in.</param>
-        /// <param name="message"> the error message </param>
-        private TaskClientCodeException(
-                string taskId,
-                string contextId,
-                string message)
-            : base(message)
-        {
-            _taskId = taskId;
-            _contextId = contextId;
-        }
-
-        /// <summary>
         /// Constructor used for serialization.
         /// </summary>
         private TaskClientCodeException(SerializationInfo info, 
StreamingContext context)

http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
index 2feccd2..330c7b4 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs
@@ -217,17 +217,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
                 Logger.Log(Level.Warning, 
string.Format(CultureInfo.InvariantCulture, "Trying to suspend an task that is 
in {0} state. Ignored.", _currentStatus.State));
                 return;
             }
-            try
-            {
-                OnNext(new SuspendEventImpl(message));
-                _currentStatus.SetSuspendRequested();
-            }
-            catch (Exception e)
-            {
-                Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error 
during Suspend.", Logger);
-                _currentStatus.SetException(
-                    TaskClientCodeException.Create(TaskId, ContextId, "Error 
during Suspend().", e));
-            }
+            
+            // An Exception in suspend should crash the Evaluator.
+            OnNext(new SuspendEventImpl(message));
+            _currentStatus.SetSuspendRequested();
         }
 
         public void Deliver(byte[] message)
@@ -250,15 +243,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task
         public void OnNext(ISuspendEvent value)
         {
             Logger.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)");
-            try
-            {
-                _suspendHandlerFuture.Get().OnNext(value);
-            }
-            catch (Exception ex)
-            {
-                var suspendEx = new TaskSuspendHandlerException("Unable to 
suspend task.", ex);
-                Utilities.Diagnostics.Exceptions.CaughtAndThrow(suspendEx, 
Level.Error, Logger);
-            }
+            _suspendHandlerFuture.Get().OnNext(value);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs
 
b/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs
deleted file mode 100644
index 717c399..0000000
--- 
a/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs
+++ /dev/null
@@ -1,43 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-using System;
-
-namespace Org.Apache.REEF.Common.Tasks.Exceptions
-{
-    /// <summary>
-    /// An exception that is thrown when the task suspension event
-    /// handler is not bound.
-    /// </summary>
-    internal sealed class TaskSuspendHandlerException : Exception
-    {
-        public TaskSuspendHandlerException(string message)
-            : base(message)
-        {
-        }
-
-        public TaskSuspendHandlerException(Exception innerException)
-            : base(innerException.Message, innerException)
-        {
-        }
-
-        public TaskSuspendHandlerException(string message, Exception 
innerException)
-            : base(message, innerException)
-        {
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskSuspendExceptionTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskSuspendExceptionTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskSuspendExceptionTest.cs
new file mode 100644
index 0000000..2ce135a
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskSuspendExceptionTest.cs
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Common.Tasks.Events;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Driver.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Tests.Functional.Bridge.Exceptions;
+using Org.Apache.REEF.Tests.Functional.Common;
+using Org.Apache.REEF.Tests.Functional.Common.Task;
+using Org.Apache.REEF.Tests.Functional.Common.Task.Handlers;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.Failure.User
+{
+    /// <summary>
+    /// This test class contains a test that validates that an Exception in 
the 
+    /// TaskSuspendHandler causes a FailedEvaluator event in the Driver.
+    /// </summary>
+    [Collection("FunctionalTests")]
+    public sealed class TaskSuspendExceptionTest : ReefFunctionalTest
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(TaskSuspendExceptionTest));
+
+        private const string TaskSuspendExceptionMessage = 
"TaskSuspendExceptionMessage";
+        private const string InitialTaskPreWaitMessage = 
"InitialTaskPreWaitMessage";
+        private const string InitialTaskPostWaitMessage = 
"InitialTaskPostWaitMessage";
+        private const string FailedEvaluatorReceived = 
"FailedEvaluatorReceived";
+        private const string TaskSuspensionMessage = "TaskSuspensionMessage";
+
+        /// <summary>
+        /// This test validates that an Exception in the TaskSuspendHandler 
causes a FailedEvaluator event.
+        /// </summary>
+        [Fact]
+        public void TestSuspendTaskWithExceptionOnLocalRuntime()
+        {
+            string testFolder = DefaultRuntimeFolder + 
Guid.NewGuid().ToString("N").Substring(0, 4);
+            TestRun(DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, 
GenericType<TaskSuspendExceptionTestDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<TaskSuspendExceptionTestDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorFailed, 
GenericType<TaskSuspendExceptionTestDriver>.Class)
+                .Set(DriverConfiguration.OnTaskRunning, 
GenericType<TaskSuspendExceptionTestDriver>.Class)
+                .Build(), typeof(TaskSuspendExceptionTestDriver), 1, 
"testSuspendTaskWithExceptionOnLocalRuntime", "local", testFolder);
+
+            var driverMessages = new List<string>
+            {
+                TaskSuspensionMessage,
+                FailedEvaluatorReceived
+            };
+
+            ValidateMessagesSuccessfullyLoggedForDriver(driverMessages, 
testFolder, 1);
+            ValidateMessageSuccessfullyLogged(driverMessages, "driver", 
DriverStdout, testFolder, 1);
+
+            var evaluatorMessages = new List<string> { 
InitialTaskPreWaitMessage };
+            ValidateMessageSuccessfullyLogged(evaluatorMessages, "Node-*", 
EvaluatorStdout, testFolder, 1);
+            CleanUp(testFolder);
+        }
+
+        private sealed class TaskSuspendExceptionTestDriver :
+            IObserver<IDriverStarted>,
+            IObserver<IAllocatedEvaluator>,
+            IObserver<IRunningTask>,
+            IObserver<IFailedEvaluator>
+        {
+            private static readonly string TaskId = "TaskId";
+
+            private readonly IEvaluatorRequestor _requestor;
+
+            [Inject]
+            private TaskSuspendExceptionTestDriver(IEvaluatorRequestor 
requestor)
+            {
+                _requestor = requestor;
+            }
+
+            public void OnNext(IDriverStarted value)
+            {
+                _requestor.Submit(_requestor.NewBuilder().Build());
+            }
+
+            public void OnNext(IAllocatedEvaluator value)
+            {
+                // submit the first Task.
+                value.SubmitTask(TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, TaskId)
+                        .Set(TaskConfiguration.Task, 
GenericType<TaskSuspendExceptionTask>.Class)
+                        .Set(TaskConfiguration.OnSuspend, 
GenericType<TaskSuspendHandlerWithException>.Class)
+                        .Build());
+            }
+
+            public void OnNext(IRunningTask value)
+            {
+                if (value.Id == TaskId)
+                {
+                    Logger.Log(Level.Info, TaskSuspensionMessage);
+                    value.Suspend();
+                }
+            }
+
+            public void OnNext(IFailedEvaluator value)
+            {
+                Assert.True(value.FailedTask.IsPresent());
+                Assert.Equal(TaskId, value.FailedTask.Value.Id);
+                Assert.True(value.EvaluatorException.InnerException is 
TestSerializableException);
+                Assert.Equal(TaskSuspendExceptionMessage, 
value.EvaluatorException.InnerException.Message);
+                Logger.Log(Level.Info, FailedEvaluatorReceived);
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private sealed class TaskSuspendExceptionTask : WaitingTask
+        {
+            [Inject]
+            private TaskSuspendExceptionTask(EventMonitor monitor)
+                : base(monitor, InitialTaskPreWaitMessage, 
InitialTaskPostWaitMessage)
+            {
+            }
+        }
+
+        private sealed class TaskSuspendHandlerWithException : 
ExceptionThrowingHandler<ISuspendEvent>
+        {
+            [Inject]
+            private TaskSuspendHandlerWithException(EventMonitor monitor)
+                : base(
+                    new TestSerializableException(TaskSuspendExceptionMessage),
+                    close => { monitor.Signal(); })
+            {
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index f56b2bb..f910cfe 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -80,11 +80,11 @@ under the License.
     <Compile Include="Functional\Bridge\TestContextStack.cs" />
     <Compile Include="Functional\Bridge\TestFailedEvaluatorEventHandler.cs" />
     <Compile Include="Functional\Common\EventMonitor.cs" />
-    <Compile Include="Functional\Common\Task\ExceptionTask.cs" />
     <Compile 
Include="Functional\Failure\User\ServiceConstructorExceptionTest.cs" />
     <Compile 
Include="Functional\Failure\User\ReceiveContextMessageExceptionTest.cs" />
     <Compile Include="Functional\Failure\User\ContextStartExceptionTest.cs" />
     <Compile 
Include="Functional\Failure\User\ReceiveTaskMessageExceptionTest.cs" />
+    <Compile Include="Functional\Common\Task\ExceptionTask.cs" />
     <Compile Include="Functional\Failure\User\TaskCallExceptionTest.cs" />
     <Compile 
Include="Functional\Bridge\Exceptions\TestNonSerializableException.cs" />
     <Compile 
Include="Functional\Bridge\Exceptions\TestSerializableException.cs" />
@@ -95,6 +95,7 @@ under the License.
     <Compile Include="Functional\Common\Task\LoggingTask.cs" />
     <Compile 
Include="Functional\Common\Task\Handlers\ExceptionThrowingHandler.cs" />
     <Compile Include="Functional\Failure\User\TaskStartExceptionTest.cs" />
+    <Compile Include="Functional\Failure\User\TaskSuspendExceptionTest.cs" />
     <Compile 
Include="Functional\Failure\User\UnhandledThreadExceptionInTaskTest.cs" />
     <Compile Include="Functional\Common\Task\WaitingTask.cs" />
     <Compile Include="Functional\Driver\DriverTestStartHandler.cs" />

Reply via email to