Repository: reef Updated Branches: refs/heads/master 201b0d67f -> be442aedc
[REEF-1316] Adding test for resubmitting context JIRA: [REEF-1316](https://issues.apache.org/jira/browse/REEF-1316) This closes #927 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/be442aed Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/be442aed Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/be442aed Branch: refs/heads/master Commit: be442aedc076310146326cd82df44bcba7f52603 Parents: 201b0d6 Author: Julia Wang <[email protected]> Authored: Tue Apr 5 19:39:21 2016 -0700 Committer: Andrew Chung <[email protected]> Committed: Fri Apr 8 14:22:02 2016 -0700 ---------------------------------------------------------------------- .../FaultTolerant/TestResubmitEvaluator.cs | 265 +++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 1 + 2 files changed, 266 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/be442aed/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitEvaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitEvaluator.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitEvaluator.cs new file mode 100644 index 0000000..9a50d75 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestResubmitEvaluator.cs @@ -0,0 +1,265 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Text; +using System.Threading; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.FaultTolerant +{ + [Collection("FunctionalTests")] + public sealed class TestResubmitEvaluator : ReefFunctionalTest + { + private const string FailedEvaluatorMessage = "FailedEvaluatorMessage"; + private const string CompletedTaskValidationMessage = "CompletedTaskValidationmessage"; + private const string CompletedEvaluatorValidationMessage = "CompletedEvaluatorValidationMessage"; + private const string FailSignal = "Fail"; + private const string SuccSignal = "Succ"; + private const string TaskId = "TaskId"; + private const string ContextId = "ContextId"; + + /// <summary> + /// This test is to test evaluator resubmit scenarios, verify events received in failure case and successful cases, + /// validate context and task received with failed evaluator + /// </summary> + [Fact] + [Trait("Priority", "1")] + [Trait("Category", "FunctionalGated")] + [Trait("Description", "Test invocation of ResubmitEvaluatorDriver")] + public void ResubmitEvaluatorOnLocalRuntime() + { + string testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurations(), typeof(ResubmitEvaluatorDriver), 3, "ResubmitContextOnLocalRuntime", "local", testFolder); + + ValidateSuccessForLocalRuntime(2, numberOfEvaluatorsToFail: 1, testFolder: testFolder); + + var messages = new List<string>(); + messages.Add(FailedEvaluatorMessage); + ValidateMessageSuccessfullyLogged(messages, "driver", DriverStdout, testFolder, 1); + + var messages1 = new List<string>(); + messages1.Add(CompletedTaskValidationMessage); + messages1.Add(CompletedEvaluatorValidationMessage); + ValidateMessageSuccessfullyLogged(messages1, "driver", DriverStdout, testFolder, 2); + + CleanUp(testFolder); + } + + public IConfiguration DriverConfigurations() + { + return DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<ResubmitEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<ResubmitEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorCompleted, GenericType<ResubmitEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<ResubmitEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<ResubmitEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnTaskRunning, GenericType<ResubmitEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextFailed, GenericType<ResubmitEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<ResubmitEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<ResubmitEvaluatorDriver>.Class) + .Build(); + } + + private sealed class ResubmitEvaluatorDriver : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<ICompletedEvaluator>, + IObserver<IFailedEvaluator>, + IObserver<IRunningTask>, + IObserver<IFailedContext>, + IObserver<IFailedTask>, + IObserver<IActiveContext>, + IObserver<ICompletedTask> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(ResubmitEvaluatorDriver)); + + private readonly IEvaluatorRequestor _requestor; + private int _taskNumber = 1; + private int _contextNumber = 1; + private string _failedContextId; + private string _failedTaskId = TaskId + "1"; + private readonly ISet<ICompletedTask> _completedTasks = new HashSet<ICompletedTask>(); + private readonly object _compeletedTaskLock = new object(); + + [Inject] + private ResubmitEvaluatorDriver(IEvaluatorRequestor requestor) + { + _requestor = requestor; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().SetNumber(2).Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + Logger.Log(Level.Info, "AllocatedEvaluator: " + value.Id); + value.SubmitContext( + ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, ContextId + _contextNumber) + .Build()); + Interlocked.Increment(ref _contextNumber); + } + + public void OnNext(IActiveContext value) + { + Logger.Log(Level.Info, "ActiveContext: " + value.Id); + value.SubmitTask(TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, TaskId + _taskNumber) + .Set(TaskConfiguration.Task, GenericType<FailEvaluatorTask>.Class) + .Set(TaskConfiguration.OnMessage, GenericType<FailEvaluatorTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<FailEvaluatorTask>.Class) + .Build()); + Interlocked.Increment(ref _taskNumber); + } + + public void OnNext(IRunningTask value) + { + Logger.Log(Level.Info, string.Format(CultureInfo.InvariantCulture, "RunningTask: {0}, Context: {1}.", value.Id, value.ActiveContext.Id)); + if (value.Id.Equals(_failedTaskId)) + { + _failedContextId = value.ActiveContext.Id; + } + string msg = value.Id.Equals(_failedTaskId) ? FailSignal : SuccSignal; + value.Send(Encoding.UTF8.GetBytes(msg)); + } + + public void OnNext(ICompletedTask value) + { + Logger.Log(Level.Info, CompletedTaskValidationMessage + ". Task completed: " + value.Id); + lock (_compeletedTaskLock) + { + _completedTasks.Add(value); + if (_completedTasks.Count < 2) + { + return; + } + foreach (var t in _completedTasks) + { + t.ActiveContext.Dispose(); + } + } + } + + public void OnNext(ICompletedEvaluator value) + { + Logger.Log(Level.Info, CompletedEvaluatorValidationMessage + ". Evaluator completed: " + value.Id); + } + + public void OnNext(IFailedEvaluator value) + { + Logger.Log(Level.Info, FailedEvaluatorMessage + ". Evaluator failed: " + value.Id + " FailedTaskId: " + value.FailedTask.Value.Id); + Assert.NotNull(value.FailedTask); + Assert.NotNull(value.FailedTask.Value); + Assert.NotNull(value.FailedTask.Value.Id); + Assert.Equal(_failedTaskId, value.FailedTask.Value.Id); + Assert.Equal(1, value.FailedContexts.Count); + Assert.Equal(_failedContextId, value.FailedContexts[0].Id); + + _requestor.Submit(_requestor.NewBuilder().Build()); + } + + public void OnNext(IFailedContext value) + { + throw new Exception("Did not expect Failed Context."); + } + + public void OnNext(IFailedTask value) + { + throw new Exception("Did not expect Failed Task."); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + private sealed class FailEvaluatorTask : ITask, IDriverMessageHandler, IObserver<ICloseEvent> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(FailEvaluatorTask)); + + private readonly CountdownEvent _countdownEvent = new CountdownEvent(1); + + [Inject] + private FailEvaluatorTask() + { + } + + public byte[] Call(byte[] memento) + { + Logger.Log(Level.Info, "Hello in FailEvaluatorTask"); + _countdownEvent.Wait(); + return null; + } + + public void Handle(IDriverMessage value) + { + var message = ByteUtilities.ByteArraysToString(value.Message.Value); + if (message.Equals(FailSignal)) + { + Environment.Exit(1); + } + else + { + _countdownEvent.Signal(); + } + } + + public void OnNext(ICloseEvent value) + { + _countdownEvent.Signal(); + } + + public void Dispose() + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/be442aed/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 6c8f235..4826d54 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -84,6 +84,7 @@ under the License. <Compile Include="Functional\Bridge\TestSuspendTask.cs" /> <Compile Include="Functional\Driver\DriverTestStartHandler.cs" /> <Compile Include="Functional\FaultTolerant\TestContextStart.cs" /> + <Compile Include="Functional\FaultTolerant\TestResubmitEvaluator.cs" /> <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriver.cs" /> <Compile Include="Functional\Driver\TestDriver.cs" /> <Compile Include="Functional\FaultTolerant\PoisonTest.cs" />
