Repository: reef Updated Branches: refs/heads/master bcf7ff881 -> 68e3cb0f8
[REEF-1427] Validate Task.Call failure => FailedTask Event This addressed the issue by * Moving test from Bridge/TestFailedTaskEventHandler to User/TaskCallExceptionTest. * Adding ExceptionTask for convenience in testing. * Using common classes for testing. JIRA: [REEF-1427](https://issues.apache.org/jira/browse/REEF-1427) This closes #1044 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/68e3cb0f Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/68e3cb0f Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/68e3cb0f Branch: refs/heads/master Commit: 68e3cb0f8efa73f8ef2f52a442e8e32a15d45db9 Parents: bcf7ff8 Author: Andrew Chung <[email protected]> Authored: Fri Jun 10 13:31:18 2016 -0700 Committer: Julia Wang <[email protected]> Committed: Thu Jun 16 19:59:51 2016 -0700 ---------------------------------------------------------------------- .../Bridge/TestFailedTaskEventHandler.cs | 210 ------------------- .../Functional/Common/Task/ExceptionTask.cs | 41 ++++ .../Failure/User/TaskCallExceptionTest.cs | 203 ++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 3 +- 4 files changed, 246 insertions(+), 211 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/68e3cb0f/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs deleted file mode 100644 index 251dc61..0000000 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/Bridge/TestFailedTaskEventHandler.cs +++ /dev/null @@ -1,210 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -using System; -using System.Runtime.Serialization; -using Org.Apache.REEF.Common.Exceptions; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Driver; -using Org.Apache.REEF.Driver.Bridge; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Driver.Task; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Implementations.Configuration; -using Org.Apache.REEF.Tang.Implementations.Tang; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Tests.Functional.Bridge.Exceptions; -using Org.Apache.REEF.Tests.Functional.Bridge.Parameters; -using Org.Apache.REEF.Utilities.Logging; -using Xunit; - -namespace Org.Apache.REEF.Tests.Functional.Bridge -{ - [Collection("FunctionalTests")] - public sealed class TestFailedTaskEventHandler : ReefFunctionalTest - { - private const string FailedTaskMessage = "I have successfully seen all failed tasks."; - private const string ExpectedExceptionMessage = "Expected exception."; - private const int NumFailedTasksExpected = 2; - - [Fact] - [Trait("Priority", "1")] - [Trait("Category", "FunctionalGated")] - [Trait("Description", "Test invocation of FailedTaskHandler. Validates the Task ID of the failure, as well as the Exceptions of the Task failure.")] - //// TODO[JIRA REEF-1184]: add timeout 180 sec - public void TestFailedTaskEventHandlerOnLocalRuntime() - { - string testFolder = DefaultRuntimeFolder + TestId; - TestRun(DriverConfigurations(), typeof(FailedTaskDriver), 1, "failedTaskTest", "local", testFolder); - ValidateSuccessForLocalRuntime(numberOfContextsToClose: 1, numberOfTasksToFail: NumFailedTasksExpected, testFolder: testFolder); - ValidateMessageSuccessfullyLoggedForDriver(FailedTaskMessage, testFolder); - CleanUp(testFolder); - } - - private IConfiguration DriverConfigurations() - { - return DriverConfiguration.ConfigurationModule - .Set(DriverConfiguration.OnDriverStarted, GenericType<FailedTaskDriver>.Class) - .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<FailedTaskDriver>.Class) - .Set(DriverConfiguration.OnTaskFailed, GenericType<FailedTaskDriver>.Class) - .Build(); - } - - private sealed class FailedTaskDriver : IObserver<IDriverStarted>, IObserver<IAllocatedEvaluator>, - IObserver<IFailedTask>, IObserver<ICompletedTask> - { - private const string TaskId = "1234567"; - - private static readonly Logger Logger = Logger.GetLogger(typeof(FailedTaskDriver)); - - private readonly IEvaluatorRequestor _requestor; - - private bool _shouldReceiveSerializableException = false; - private int _numFailedTasksReceived = 0; - - [Inject] - private FailedTaskDriver(IEvaluatorRequestor requestor) - { - _requestor = requestor; - } - - public void OnNext(IDriverStarted value) - { - _requestor.Submit(_requestor.NewBuilder().Build()); - } - - public void OnNext(IAllocatedEvaluator value) - { - value.SubmitTask(GetTaskConfiguration()); - } - - public void OnNext(IFailedTask value) - { - _numFailedTasksReceived++; - - if (value.Id != TaskId) - { - throw new Exception("Received Task ID " + value.Id + " instead of the expected Task ID " + TaskId); - } - - if (value.Message == null || value.Message != ExpectedExceptionMessage) - { - throw new Exception("Exception message not properly propagated. Received message " + value.Message); - } - - if (_shouldReceiveSerializableException) - { - if (_numFailedTasksReceived == NumFailedTasksExpected) - { - Logger.Log(Level.Error, FailedTaskMessage); - } - - if (value.AsError() == null || !(value.AsError() is TestSerializableException)) - { - throw new Exception("Exception should have been serialized properly."); - } - - if (value.AsError().Message != ExpectedExceptionMessage) - { - throw new Exception("Incorrect Exception message, got message: " + value.AsError().Message); - } - - value.GetActiveContext().Value.Dispose(); - } - else - { - var taskException = value.AsError(); - if (taskException == null) - { - throw new Exception("Expected a non-null task exception."); - } - - var nonSerializableTaskException = taskException as NonSerializableTaskException; - if (nonSerializableTaskException == null) - { - throw new Exception( - "Expected a NonSerializableTaskException from Task, instead got Exception of type " + taskException.GetType()); - } - - if (!(nonSerializableTaskException.InnerException is SerializationException)) - { - throw new Exception("Expected a SerializationException as the inner Exception of the Task Exception."); - } - - _shouldReceiveSerializableException = true; - value.GetActiveContext().Value.SubmitTask(GetTaskConfiguration()); - } - } - - public void OnNext(ICompletedTask value) - { - throw new Exception("Did not expect a completed task."); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - - private IConfiguration GetTaskConfiguration() - { - var shouldThrowSerializableConfig = TangFactory.GetTang().NewConfigurationBuilder() - .BindNamedParameter<ShouldThrowSerializableException, bool>( - GenericType<ShouldThrowSerializableException>.Class, _shouldReceiveSerializableException.ToString()) - .Build(); - - var taskConfig = TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, TaskId) - .Set(TaskConfiguration.Task, GenericType<FailTask>.Class) - .Build(); - - return Configurations.Merge(shouldThrowSerializableConfig, taskConfig); - } - } - - private sealed class FailTask : ITask - { - private readonly bool _shouldThrowSerializableException; - - [Inject] - private FailTask([Parameter(typeof(ShouldThrowSerializableException))] bool shouldThrowSerializableException) - { - _shouldThrowSerializableException = shouldThrowSerializableException; - } - - public void Dispose() - { - } - - public byte[] Call(byte[] memento) - { - if (_shouldThrowSerializableException) - { - throw new TestSerializableException(ExpectedExceptionMessage); - } - - throw new TestNonSerializableException(ExpectedExceptionMessage); - } - } - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/68e3cb0f/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/ExceptionTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/ExceptionTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/ExceptionTask.cs new file mode 100644 index 0000000..49b6f86 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Common/Task/ExceptionTask.cs @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks; + +namespace Org.Apache.REEF.Tests.Functional.Common.Task +{ + public abstract class ExceptionTask : ITask + { + private readonly Exception _exToThrow; + + protected ExceptionTask(Exception exToThrow) + { + _exToThrow = exToThrow; + } + + public byte[] Call(byte[] memento) + { + throw _exToThrow; + } + + public void Dispose() + { + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/68e3cb0f/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskCallExceptionTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskCallExceptionTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskCallExceptionTest.cs new file mode 100644 index 0000000..87f118f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/User/TaskCallExceptionTest.cs @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Runtime.Serialization; +using Org.Apache.REEF.Common.Exceptions; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Tests.Functional.Bridge.Exceptions; +using Org.Apache.REEF.Tests.Functional.Bridge.Parameters; +using Org.Apache.REEF.Tests.Functional.Common.Task; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Failure.User +{ + [Collection("FunctionalTests")] + public sealed class TaskCallExceptionTest : ReefFunctionalTest + { + private const string FailedTaskMessage = "I have successfully seen all failed tasks."; + private const string ExpectedExceptionMessage = "Expected exception."; + private const int NumFailedTasksExpected = 2; + + [Fact] + [Trait("Priority", "1")] + [Trait("Category", "FunctionalGated")] + [Trait("Description", "Test invocation of FailedTaskHandler. Validates the Task ID of the failure, as well as the Exceptions of the Task failure.")] + //// TODO[JIRA REEF-1184]: add timeout 180 sec + public void TestFailedTaskEventHandlerOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + TestId; + TestRun( + DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<TaskCallExceptionDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<TaskCallExceptionDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<TaskCallExceptionDriver>.Class) + .Build(), + typeof(TaskCallExceptionDriver), 1, "failedTaskTest", "local", testFolder); + + ValidateSuccessForLocalRuntime(numberOfContextsToClose: 1, numberOfTasksToFail: NumFailedTasksExpected, testFolder: testFolder); + ValidateMessageSuccessfullyLoggedForDriver(FailedTaskMessage, testFolder); + CleanUp(testFolder); + } + + private sealed class TaskCallExceptionDriver : IObserver<IDriverStarted>, IObserver<IAllocatedEvaluator>, + IObserver<IFailedTask>, IObserver<ICompletedTask> + { + private const string TaskId = "1234567"; + + private static readonly Logger Logger = Logger.GetLogger(typeof(TaskCallExceptionDriver)); + + private readonly IEvaluatorRequestor _requestor; + + private bool _shouldReceiveSerializableException = false; + private int _numFailedTasksReceived = 0; + + [Inject] + private TaskCallExceptionDriver(IEvaluatorRequestor requestor) + { + _requestor = requestor; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + value.SubmitTask(GetTaskConfiguration()); + } + + public void OnNext(IFailedTask value) + { + _numFailedTasksReceived++; + + if (value.Id != TaskId) + { + throw new Exception("Received Task ID " + value.Id + " instead of the expected Task ID " + TaskId); + } + + if (value.Message == null || value.Message != ExpectedExceptionMessage) + { + throw new Exception("Exception message not properly propagated. Received message " + value.Message); + } + + if (_shouldReceiveSerializableException) + { + if (_numFailedTasksReceived == NumFailedTasksExpected) + { + Logger.Log(Level.Error, FailedTaskMessage); + } + + if (value.AsError() == null || !(value.AsError() is TestSerializableException)) + { + throw new Exception("Exception should have been serialized properly."); + } + + if (value.AsError().Message != ExpectedExceptionMessage) + { + throw new Exception("Incorrect Exception message, got message: " + value.AsError().Message); + } + + value.GetActiveContext().Value.Dispose(); + } + else + { + var taskException = value.AsError(); + if (taskException == null) + { + throw new Exception("Expected a non-null task exception."); + } + + var nonSerializableTaskException = taskException as NonSerializableTaskException; + if (nonSerializableTaskException == null) + { + throw new Exception( + "Expected a NonSerializableTaskException from Task, instead got Exception of type " + taskException.GetType()); + } + + if (!(nonSerializableTaskException.InnerException is SerializationException)) + { + throw new Exception("Expected a SerializationException as the inner Exception of the Task Exception."); + } + + _shouldReceiveSerializableException = true; + value.GetActiveContext().Value.SubmitTask(GetTaskConfiguration()); + } + } + + public void OnNext(ICompletedTask value) + { + throw new Exception("Did not expect a completed task."); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + private IConfiguration GetTaskConfiguration() + { + var shouldThrowSerializableConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindNamedParameter<ShouldThrowSerializableException, bool>( + GenericType<ShouldThrowSerializableException>.Class, _shouldReceiveSerializableException.ToString()) + .Build(); + + var taskConfig = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, TaskId) + .Set(TaskConfiguration.Task, GenericType<TaskCallExceptionTask>.Class) + .Build(); + + return Configurations.Merge(shouldThrowSerializableConfig, taskConfig); + } + } + + private sealed class TaskCallExceptionTask : ExceptionTask + { + [Inject] + private TaskCallExceptionTask( + [Parameter(typeof(ShouldThrowSerializableException))] bool shouldThrowSerializableException) + : base(GetExceptionToThrow(shouldThrowSerializableException)) + { + } + + private static Exception GetExceptionToThrow(bool shouldThrowSerializableException) + { + if (shouldThrowSerializableException) + { + return new TestSerializableException(ExpectedExceptionMessage); + } + + return new TestNonSerializableException(ExpectedExceptionMessage); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/68e3cb0f/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 6dde363..2e83ab6 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -79,7 +79,8 @@ under the License. <Compile Include="Functional\Bridge\TestCloseTask.cs" /> <Compile Include="Functional\Bridge\TestContextStack.cs" /> <Compile Include="Functional\Bridge\TestFailedEvaluatorEventHandler.cs" /> - <Compile Include="Functional\Bridge\TestFailedTaskEventHandler.cs" /> + <Compile Include="Functional\Common\Task\ExceptionTask.cs" /> + <Compile Include="Functional\Failure\User\TaskCallExceptionTest.cs" /> <Compile Include="Functional\Bridge\Exceptions\TestNonSerializableException.cs" /> <Compile Include="Functional\Bridge\Exceptions\TestSerializableException.cs" /> <Compile Include="Functional\Bridge\TestSimpleContext.cs" />
