Repository: reef Updated Branches: refs/heads/master 1a329b4c3 -> 082186a2d
[REEF-1317] Adding test for resubmitting tasks to an Evaluator after a Failed Task JIRA: [REEF-1317](https://issues.apache.org/jira/browse/REEF-1317) Pull Request: This closes #950 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/082186a2 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/082186a2 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/082186a2 Branch: refs/heads/master Commit: 082186a2dcc440cea772e103ed7e9918086cc6d9 Parents: 1a329b4 Author: Julia Wang <[email protected]> Authored: Wed Apr 13 15:27:18 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Wed Apr 20 16:43:43 2016 -0700 ---------------------------------------------------------------------- .../FaultTolerant/TestResubmitTask.cs | 298 +++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 1 + 2 files changed, 299 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/082186a2/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs new file mode 100644 index 0000000..8e2962f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitTask.cs @@ -0,0 +1,298 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.FaultTolerant +{ + /// <summary> + /// This is scenario testing. It is to test the following scenario to make sure the events, messages we receive are what expected. + /// * Submit a task on an active context + /// * After the task is running, driver sends an event to evaluator to close the task + /// * Task throws exception with a message telling the driver that the task fails as instructed by driver + /// * Driver receives the FailedTask event and resubmit a task on the existing context + /// * In task IDriverMessage, verify the message send from drive is the same as what is expected + /// * In task ICloseEvent, verify the message in the close event is the same as what is expected. + /// The test can submit two evaluators/Contexts/Tasks and let both to close, and verify: + /// * In IFailedTask, the task and context mappings are the same as the assignment before the task was submitted. + /// * In IFailedTask, the exception message in IFailedTask is the same as the one thrown in the Task + /// * In ICompletedTask, verify the task and context mapping are still remain the same as the assignment before the task was submitted. + /// Test Verification: + /// * numberOfContextsToClose == 2 + /// * numberOfTasksToFail == 2 + /// * numberOfEvaluatorsToFail == 0 + /// If any of above verification fails, the test fails. + /// </summary> + [Collection("FunctionalTests")] + public sealed class TestResubimitTask : ReefFunctionalTest + { + private static readonly Logger Logger = Logger.GetLogger(typeof(TestResubimitTask)); + + private const string KillTaskCommandFromDriver = "KillTaskCommandFromDriver"; + private const string CompleteTaskCommandFromDriver = "CompleteTaskCommandFromDriver"; + private const string TaskKilledByDriver = "TaskKilledByDriver"; + private const string UnExpectedCloseMessage = "UnExpectedCloseMessage"; + private const string UnExpectedCompleteMessage = "UnExpectedCompleteMessage"; + + /// <summary> + /// This test submits two evaluators/contexts/tasks, then close the two running tasks and resubmit two new tasks + /// on the existing active contexts. It is to verify events and messages received are the same as what we expected. + /// It is to verify we can submit tasks on existing contexts if previous tasks fail. + /// </summary> + [Fact] + public void TestStopAndResubmitTaskOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurations(), typeof(ResubmitTaskTestDriver), 2, "TestResubimitTask", "local", testFolder); + ValidateSuccessForLocalRuntime(2, 2, 0, testFolder); + CleanUp(testFolder); + } + + /// <summary> + /// Driver configuration for the test driver + /// </summary> + /// <returns></returns> + public IConfiguration DriverConfigurations() + { + return DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<ResubmitTaskTestDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<ResubmitTaskTestDriver>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<ResubmitTaskTestDriver>.Class) + .Set(DriverConfiguration.OnTaskRunning, GenericType<ResubmitTaskTestDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<ResubmitTaskTestDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<ResubmitTaskTestDriver>.Class) + .Build(); + } + + /// <summary> + /// Test driver + /// </summary> + private sealed class ResubmitTaskTestDriver : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IActiveContext>, + IObserver<ICompletedTask>, + IObserver<IFailedTask>, + IObserver<IRunningTask> + { + private readonly IEvaluatorRequestor _requestor; + private const string TaskId = "TaskId"; + private int _taskNumber = 1; + private const string ContextId = "ContextId"; + private int _contextNumber = 1; + private readonly IDictionary<string, string> _taskContextMapping = new Dictionary<string, string>(); + private readonly object _lock = new object(); + + [Inject] + private ResubmitTaskTestDriver(IEvaluatorRequestor evaluatorRequestor) + { + _requestor = evaluatorRequestor; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().SetNumber(2).Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + lock (_lock) + { + value.SubmitContext( + ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, ContextId + _contextNumber) + .Build()); + _contextNumber++; + } + } + + public void OnNext(IActiveContext value) + { + lock (_lock) + { + value.SubmitTask(GetTaskConfigurationForCloseTask(TaskId + _taskNumber)); + _taskContextMapping.Add(TaskId + _taskNumber, value.Id); + _taskNumber++; + } + } + + public void OnNext(ICompletedTask value) + { + Logger.Log(Level.Info, "Task completed: " + value.Id); + VerifyContextTaskMapping(value.Id, value.ActiveContext.Id); + value.ActiveContext.Dispose(); + } + + /// <summary> + /// Verify when exception is shown in task, IFailedTask will be received here with the message set in the task + /// And verify the context associated with the failed task is the same as the context that the task was submitted + /// </summary> + /// <param name="value"></param> + public void OnNext(IFailedTask value) + { + var failedExeption = ByteUtilities.ByteArraysToString(value.Data.Value); + Logger.Log(Level.Error, "In IFailedTask: " + failedExeption); + + VerifyContextTaskMapping(value.Id, value.GetActiveContext().Value.Id); + Assert.Contains(TaskKilledByDriver, failedExeption); + + OnNext(value.GetActiveContext().Value); + } + + private void VerifyContextTaskMapping(string taskId, string contextId) + { + lock (_lock) + { + string expectedContextId; + _taskContextMapping.TryGetValue(taskId, out expectedContextId); + Assert.Equal(expectedContextId, contextId); + } + } + + /// <summary> + /// Close the first two tasks and send message to the 3rd and 4th tasks + /// </summary> + /// <param name="value"></param> + public void OnNext(IRunningTask value) + { + Logger.Log(Level.Info, "Task running: " + value.Id); + switch (value.Id) + { + case TaskId + "1": + case TaskId + "2": + value.Dispose(Encoding.UTF8.GetBytes(KillTaskCommandFromDriver)); + break; + case TaskId + "3": + case TaskId + "4": + value.Send(Encoding.UTF8.GetBytes(CompleteTaskCommandFromDriver)); + break; + default: + throw new Exception("It should not be reached."); + } + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + private static IConfiguration GetTaskConfigurationForCloseTask(string taskId) + { + return TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, taskId) + .Set(TaskConfiguration.Task, GenericType<ResubmitTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<ResubmitTask>.Class) + .Set(TaskConfiguration.OnMessage, GenericType<ResubmitTask>.Class) + .Build(); + } + } + + private sealed class ResubmitTask : ITask, IDriverMessageHandler, IObserver<ICloseEvent> + { + private readonly CountdownEvent _suspendSignal = new CountdownEvent(1); + + [Inject] + private ResubmitTask() + { + } + + public byte[] Call(byte[] memento) + { + Logger.Log(Level.Info, "Hello in ResubmitTask"); + _suspendSignal.Wait(); + return null; + } + + public void Dispose() + { + Logger.Log(Level.Info, "Task is disposed."); + } + + /// <summary> + /// When receiving closed task event, verify the command from the driver. If it matches expected message, + /// throw exception to close the task. Otherwise, signal the task to return, that would result in test failure + /// as the test expect two failed tasks. + /// </summary> + /// <param name="value"></param> + public void OnNext(ICloseEvent value) + { + if (value.Value != null && value.Value.Value != null) + { + Logger.Log(Level.Info, + "Closed event received in task:" + Encoding.UTF8.GetString(value.Value.Value)); + if (Encoding.UTF8.GetString(value.Value.Value).Equals(KillTaskCommandFromDriver)) + { + throw new Exception(TaskKilledByDriver); + } + Logger.Log(Level.Error, UnExpectedCloseMessage); + _suspendSignal.Signal(); + } + } + + /// <summary> + /// Expect the message from driver. If the message is the same as what is sent from driver, signal the task to properly return + /// Otherwise, throw exception which would cause an unexpected failed task therefore failed test verification. + /// </summary> + /// <param name="value"></param> + public void Handle(IDriverMessage value) + { + var message = ByteUtilities.ByteArraysToString(value.Message.Value); + Logger.Log(Level.Info, "Complete task message received in task:" + message); + if (message.Equals(CompleteTaskCommandFromDriver)) + { + _suspendSignal.Signal(); + } + else + { + Logger.Log(Level.Error, UnExpectedCompleteMessage); + throw new Exception(UnExpectedCompleteMessage); + } + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/082186a2/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 5d501a8..81b3b41 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -85,6 +85,7 @@ under the License. <Compile Include="Functional\Driver\DriverTestStartHandler.cs" /> <Compile Include="Functional\FaultTolerant\TestContextStart.cs" /> <Compile Include="Functional\FaultTolerant\TestResubmitEvaluator.cs" /> + <Compile Include="Functional\FaultTolerant\TestResubmitTask.cs" /> <Compile Include="Functional\IMRU\IMRUBroadcastReduceTest.cs" /> <Compile Include="Functional\IMRU\IMRUMapperCountTest.cs" /> <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriverSpecifyingDefaultRuntimeName.cs" />
