Repository: reef Updated Branches: refs/heads/master df1a226d6 -> 9ee932462
[REEF-1429] Validate Task Suspend failure => FailedEvaluator Event This addressed the issue by * Adding a test to validate that a failure in task suspension triggers a FailedEvaluator. JIRA: [REEF-1429](https://issues.apache.org/jira/browse/REEF-1429) This closes #1066 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/9ee93246 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/9ee93246 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/9ee93246 Branch: refs/heads/master Commit: 9ee932462ae4946514e2c4f601e3f6528ca3af13 Parents: df1a226 Author: Andrew Chung <[email protected]> Authored: Fri Jun 17 13:52:32 2016 -0700 Committer: Julia Wang <[email protected]> Committed: Fri Jul 1 17:51:17 2016 -0700 ---------------------------------------------------------------------- lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp | 19 +-- lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h | 4 - .../Org.Apache.REEF.Common.csproj | 1 - .../Evaluator/Task/TaskClientCodeException.cs | 18 +-- .../Runtime/Evaluator/Task/TaskRuntime.cs | 25 +-- .../Exceptions/TaskSuspendHandlerException.cs | 43 ----- .../Failure/User/TaskSuspendExceptionTest.cs | 159 +++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 3 +- 8 files changed, 174 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp index 66e1a01..69c28bc 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp @@ -109,19 +109,14 @@ array<byte>^ ManagedByteArrayFromJavaByteArray( jbyteArray JavaByteArrayFromManagedByteArray( JNIEnv *env, array<byte>^ managedByteArray) { - jbyteArray javaByteArray = env->NewByteArray(managedByteArray->Length); - pin_ptr<Byte> p = &managedByteArray[0]; - env->SetByteArrayRegion(javaByteArray, 0, managedByteArray->Length, (jbyte*) p); - return javaByteArray; -} + if (managedByteArray != nullptr) { + jbyteArray javaByteArray = env->NewByteArray(managedByteArray->Length); + pin_ptr<Byte> p = &managedByteArray[0]; + env->SetByteArrayRegion(javaByteArray, 0, managedByteArray->Length, (jbyte*)p); + return javaByteArray; + } -jlongArray JavaLongArrayFromManagedLongArray( - JNIEnv *env, - array<unsigned long long>^ managedLongArray) { - jlongArray javaLongArray = env->NewLongArray(managedLongArray->Length); - pin_ptr<unsigned long long> p = &managedLongArray[0]; - env->SetLongArrayRegion(javaLongArray, 0, managedLongArray->Length, (jlong*) p); - return javaLongArray; + return NULL; } JNIEnv* RetrieveEnv(JavaVM* jvm) { http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h index 7d3baa5..aa50ddd 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h +++ b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h @@ -56,10 +56,6 @@ jbyteArray JavaByteArrayFromManagedByteArray( JNIEnv *env, array<byte>^ managedByteArray); -jlongArray JavaLongArrayFromManagedLongArray( - JNIEnv *env, - array<unsigned long long>^ managedLongArray); - JNIEnv* RetrieveEnv(JavaVM* jvm); String^ FormatJavaExceptionMessage(String^ errorMessage, Exception^ exception); http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index d438ad1..29955d8 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -188,7 +188,6 @@ under the License. <Compile Include="Tasks\Events\ITaskStart.cs" /> <Compile Include="Tasks\Events\ITaskStop.cs" /> <Compile Include="Tasks\Exceptions\TaskCloseHandlerNotBoundException.cs" /> - <Compile Include="Tasks\Exceptions\TaskSuspendHandlerException.cs" /> <Compile Include="Tasks\IDriverConnectionMessageHandler.cs" /> <Compile Include="Tasks\IDriverMessageHandler.cs" /> <Compile Include="Tasks\IDriverConnectionMessage.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs index 47ceb8d..2b12c7c 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskClientCodeException.cs @@ -23,7 +23,7 @@ using Org.Apache.REEF.Common.Exceptions; namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task { /// <summary> - /// An Exception thrown when Task operations (Start, Stop, Suspend) fail. + /// An Exception thrown when Task creation fails. /// </summary> [Serializable] public sealed class TaskClientCodeException : Exception @@ -74,22 +74,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task } /// <summary> - /// construct the exception that caused the Task to fail - /// </summary> - /// <param name="taskId"> the id of the failed task.</param> - /// <param name="contextId"> the id of the context the failed Task was executing in.</param> - /// <param name="message"> the error message </param> - private TaskClientCodeException( - string taskId, - string contextId, - string message) - : base(message) - { - _taskId = taskId; - _contextId = contextId; - } - - /// <summary> /// Constructor used for serialization. /// </summary> private TaskClientCodeException(SerializationInfo info, StreamingContext context) http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs index 2feccd2..330c7b4 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Task/TaskRuntime.cs @@ -217,17 +217,10 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task Logger.Log(Level.Warning, string.Format(CultureInfo.InvariantCulture, "Trying to suspend an task that is in {0} state. Ignored.", _currentStatus.State)); return; } - try - { - OnNext(new SuspendEventImpl(message)); - _currentStatus.SetSuspendRequested(); - } - catch (Exception e) - { - Utilities.Diagnostics.Exceptions.Caught(e, Level.Error, "Error during Suspend.", Logger); - _currentStatus.SetException( - TaskClientCodeException.Create(TaskId, ContextId, "Error during Suspend().", e)); - } + + // An Exception in suspend should crash the Evaluator. + OnNext(new SuspendEventImpl(message)); + _currentStatus.SetSuspendRequested(); } public void Deliver(byte[] message) @@ -250,15 +243,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Task public void OnNext(ISuspendEvent value) { Logger.Log(Level.Info, "TaskRuntime::OnNext(ISuspendEvent value)"); - try - { - _suspendHandlerFuture.Get().OnNext(value); - } - catch (Exception ex) - { - var suspendEx = new TaskSuspendHandlerException("Unable to suspend task.", ex); - Utilities.Diagnostics.Exceptions.CaughtAndThrow(suspendEx, Level.Error, Logger); - } + _suspendHandlerFuture.Get().OnNext(value); } /// <summary> http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs b/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs deleted file mode 100644 index 717c399..0000000 --- a/lang/cs/Org.Apache.REEF.Common/Tasks/Exceptions/TaskSuspendHandlerException.cs +++ /dev/null @@ -1,43 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -using System; - -namespace Org.Apache.REEF.Common.Tasks.Exceptions -{ - /// <summary> - /// An exception that is thrown when the task suspension event - /// handler is not bound. - /// </summary> - internal sealed class TaskSuspendHandlerException : Exception - { - public TaskSuspendHandlerException(string message) - : base(message) - { - } - - public TaskSuspendHandlerException(Exception innerException) - : base(innerException.Message, innerException) - { - } - - public TaskSuspendHandlerException(string message, Exception innerException) - : base(message, innerException) - { - } - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskSuspendExceptionTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskSuspendExceptionTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskSuspendExceptionTest.cs new file mode 100644 index 0000000..2ce135a --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskSuspendExceptionTest.cs @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Tests.Functional.Bridge.Exceptions; +using Org.Apache.REEF.Tests.Functional.Common; +using Org.Apache.REEF.Tests.Functional.Common.Task; +using Org.Apache.REEF.Tests.Functional.Common.Task.Handlers; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Failure.User +{ + /// <summary> + /// This test class contains a test that validates that an Exception in the + /// TaskSuspendHandler causes a FailedEvaluator event in the Driver. + /// </summary> + [Collection("FunctionalTests")] + public sealed class TaskSuspendExceptionTest : ReefFunctionalTest + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TaskSuspendExceptionTest)); + + private const string TaskSuspendExceptionMessage = "TaskSuspendExceptionMessage"; + private const string InitialTaskPreWaitMessage = "InitialTaskPreWaitMessage"; + private const string InitialTaskPostWaitMessage = "InitialTaskPostWaitMessage"; + private const string FailedEvaluatorReceived = "FailedEvaluatorReceived"; + private const string TaskSuspensionMessage = "TaskSuspensionMessage"; + + /// <summary> + /// This test validates that an Exception in the TaskSuspendHandler causes a FailedEvaluator event. + /// </summary> + [Fact] + public void TestSuspendTaskWithExceptionOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + Guid.NewGuid().ToString("N").Substring(0, 4); + TestRun(DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<TaskSuspendExceptionTestDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<TaskSuspendExceptionTestDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<TaskSuspendExceptionTestDriver>.Class) + .Set(DriverConfiguration.OnTaskRunning, GenericType<TaskSuspendExceptionTestDriver>.Class) + .Build(), typeof(TaskSuspendExceptionTestDriver), 1, "testSuspendTaskWithExceptionOnLocalRuntime", "local", testFolder); + + var driverMessages = new List<string> + { + TaskSuspensionMessage, + FailedEvaluatorReceived + }; + + ValidateMessagesSuccessfullyLoggedForDriver(driverMessages, testFolder, 1); + ValidateMessageSuccessfullyLogged(driverMessages, "driver", DriverStdout, testFolder, 1); + + var evaluatorMessages = new List<string> { InitialTaskPreWaitMessage }; + ValidateMessageSuccessfullyLogged(evaluatorMessages, "Node-*", EvaluatorStdout, testFolder, 1); + CleanUp(testFolder); + } + + private sealed class TaskSuspendExceptionTestDriver : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IRunningTask>, + IObserver<IFailedEvaluator> + { + private static readonly string TaskId = "TaskId"; + + private readonly IEvaluatorRequestor _requestor; + + [Inject] + private TaskSuspendExceptionTestDriver(IEvaluatorRequestor requestor) + { + _requestor = requestor; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + // submit the first Task. + value.SubmitTask(TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, TaskId) + .Set(TaskConfiguration.Task, GenericType<TaskSuspendExceptionTask>.Class) + .Set(TaskConfiguration.OnSuspend, GenericType<TaskSuspendHandlerWithException>.Class) + .Build()); + } + + public void OnNext(IRunningTask value) + { + if (value.Id == TaskId) + { + Logger.Log(Level.Info, TaskSuspensionMessage); + value.Suspend(); + } + } + + public void OnNext(IFailedEvaluator value) + { + Assert.True(value.FailedTask.IsPresent()); + Assert.Equal(TaskId, value.FailedTask.Value.Id); + Assert.True(value.EvaluatorException.InnerException is TestSerializableException); + Assert.Equal(TaskSuspendExceptionMessage, value.EvaluatorException.InnerException.Message); + Logger.Log(Level.Info, FailedEvaluatorReceived); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + private sealed class TaskSuspendExceptionTask : WaitingTask + { + [Inject] + private TaskSuspendExceptionTask(EventMonitor monitor) + : base(monitor, InitialTaskPreWaitMessage, InitialTaskPostWaitMessage) + { + } + } + + private sealed class TaskSuspendHandlerWithException : ExceptionThrowingHandler<ISuspendEvent> + { + [Inject] + private TaskSuspendHandlerWithException(EventMonitor monitor) + : base( + new TestSerializableException(TaskSuspendExceptionMessage), + close => { monitor.Signal(); }) + { + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/9ee93246/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index f56b2bb..f910cfe 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -80,11 +80,11 @@ under the License. <Compile Include="Functional\Bridge\TestContextStack.cs" /> <Compile Include="Functional\Bridge\TestFailedEvaluatorEventHandler.cs" /> <Compile Include="Functional\Common\EventMonitor.cs" /> - <Compile Include="Functional\Common\Task\ExceptionTask.cs" /> <Compile Include="Functional\Failure\User\ServiceConstructorExceptionTest.cs" /> <Compile Include="Functional\Failure\User\ReceiveContextMessageExceptionTest.cs" /> <Compile Include="Functional\Failure\User\ContextStartExceptionTest.cs" /> <Compile Include="Functional\Failure\User\ReceiveTaskMessageExceptionTest.cs" /> + <Compile Include="Functional\Common\Task\ExceptionTask.cs" /> <Compile Include="Functional\Failure\User\TaskCallExceptionTest.cs" /> <Compile Include="Functional\Bridge\Exceptions\TestNonSerializableException.cs" /> <Compile Include="Functional\Bridge\Exceptions\TestSerializableException.cs" /> @@ -95,6 +95,7 @@ under the License. <Compile Include="Functional\Common\Task\LoggingTask.cs" /> <Compile Include="Functional\Common\Task\Handlers\ExceptionThrowingHandler.cs" /> <Compile Include="Functional\Failure\User\TaskStartExceptionTest.cs" /> + <Compile Include="Functional\Failure\User\TaskSuspendExceptionTest.cs" /> <Compile Include="Functional\Failure\User\UnhandledThreadExceptionInTaskTest.cs" /> <Compile Include="Functional\Common\Task\WaitingTask.cs" /> <Compile Include="Functional\Driver\DriverTestStartHandler.cs" />
