Repository: reef Updated Branches: refs/heads/master d6b56c6f6 -> fda3ee620
[REEF-1304] Create tests which use .NET Poison to validate evaluator failure scenarios This change: * refactors helper classes for sleep task and poisoned drivers out of PoisonTest. * changes PoisonedEventHandler to always throw exception in a separate thread. * converts PoisonTest to TestEvaluatorWithRunningTaskDelayedPoison. * creates tests for other evaluator failure scenarios. JIRA: [REEF-1304](https://issues.apache.org/jira/browse/REEF-1304) Pull request: This closes #975 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/fda3ee62 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/fda3ee62 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/fda3ee62 Branch: refs/heads/master Commit: fda3ee6205b36789908c466272ee0b161dd91f1d Parents: d6b56c6 Author: Mariia Mykhailova <[email protected]> Authored: Tue Apr 26 15:41:37 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Fri May 6 17:21:27 2016 -0700 ---------------------------------------------------------------------- .../Poison/PoisonedEventHandler.cs | 7 +- .../Failure/BasePoisonedEvaluatorDriver.cs | 101 ++++++++++ ...ePoisonedEvaluatorWithActiveContextDriver.cs | 70 +++++++ ...asePoisonedEvaluatorWithRunningTaskDriver.cs | 76 ++++++++ .../Functional/Failure/SleepTask.cs | 66 +++++++ ...stEvaluatorWithActiveContextDelayedPoison.cs | 105 +++++++++++ ...EvaluatorWithActiveContextImmediatePoison.cs | 105 +++++++++++ ...stEvaluatorWithCompletedTaskDelayedPoison.cs | 104 +++++++++++ ...TestEvaluatorWithRunningTaskDelayedPoison.cs | 95 ++++++++++ ...stEvaluatorWithRunningTaskImmediatePoison.cs | 95 ++++++++++ .../Functional/FaultTolerant/PoisonTest.cs | 186 ------------------- .../Functional/ReefFunctionalTest.cs | 6 +- .../Org.Apache.REEF.Tests.csproj | 10 +- 13 files changed, 831 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/fda3ee62/lang/cs/Org.Apache.REEF.Common/Poison/PoisonedEventHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Poison/PoisonedEventHandler.cs b/lang/cs/Org.Apache.REEF.Common/Poison/PoisonedEventHandler.cs index a5f07da..474afa4 100644 --- a/lang/cs/Org.Apache.REEF.Common/Poison/PoisonedEventHandler.cs +++ b/lang/cs/Org.Apache.REEF.Common/Poison/PoisonedEventHandler.cs @@ -56,18 +56,15 @@ namespace Org.Apache.REEF.Common.Poison /// <summary> /// Throws a PoisonException with probability CrashProbability between time CrashMinDelay and CrashMinDelay + CrashTimeout. + /// Uses a separate thread to throw the exception. /// </summary> public void OnNext(T value) { - Logger.Log(Level.Verbose, "Poisoned handler for {0}", typeof(T).FullName); + Logger.Log(Level.Info, "Poisoned handler for {0}", typeof(T).FullName); if (_rand.NextDouble() <= _crashProbability) { int timeToCrash = _rand.Next(_crashTimeout) + _crashMinDelay; Logger.Log(Level.Info, "Poisoning successful, crashing in {0} msec.", timeToCrash); - if (timeToCrash == 0) - { - throw new PoisonException("Crashed at " + DateTime.Now); - } IObserver<Alarm> poisonedAlarm = Observer.Create<Alarm>( x => { http://git-wip-us.apache.org/repos/asf/reef/blob/fda3ee62/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/BasePoisonedEvaluatorDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/BasePoisonedEvaluatorDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/BasePoisonedEvaluatorDriver.cs new file mode 100644 index 0000000..e75a38d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/BasePoisonedEvaluatorDriver.cs @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Tests.Functional.Failure +{ + /// <summary> + /// Base class used by evaluator poisoning tests. + /// In case of evaluator failure, we expect to NOT get Failed/Closed context or Failed/Completed task. + /// This driver ensures that in case of any of these events an exception is thrown. + /// Also, this driver abstracts the common IDriverStarted handler which submits evaluator request. + /// </summary> + internal class BasePoisonedEvaluatorDriver : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IClosedContext>, + IObserver<IFailedContext>, + IObserver<ICompletedTask>, + IObserver<IFailedTask> + { + public static readonly string UnexpectedFailedContext = "A failed context was not expected."; + public static readonly string UnexpectedClosedContext = "A closed context was not expected."; + public static readonly string UnexpectedFailedTask = "A failed task was not expected."; + public static readonly string UnexpectedCompletedTask = "A completed task was not expected."; + + private readonly IEvaluatorRequestor _requestor; + protected readonly string ContextId; + protected readonly string TaskId; + + [Inject] + protected BasePoisonedEvaluatorDriver(IEvaluatorRequestor requestor) + { + _requestor = requestor; + ContextId = Guid.NewGuid().ToString("N").Substring(0, 8); + TaskId = Guid.NewGuid().ToString("N").Substring(0, 8); + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().Build()); + } + + public virtual void OnNext(IAllocatedEvaluator value) + { + value.SubmitContext(ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, ContextId) + .Build()); + } + + public void OnNext(IFailedContext value) + { + throw new Exception(UnexpectedFailedContext); + } + + public void OnNext(IClosedContext value) + { + throw new Exception(UnexpectedClosedContext); + } + + public void OnNext(IFailedTask value) + { + throw new Exception(UnexpectedFailedTask); + } + + public virtual void OnNext(ICompletedTask value) + { + throw new Exception(UnexpectedCompletedTask); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fda3ee62/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/BasePoisonedEvaluatorWithActiveContextDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/BasePoisonedEvaluatorWithActiveContextDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/BasePoisonedEvaluatorWithActiveContextDriver.cs new file mode 100644 index 0000000..5c51676 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/BasePoisonedEvaluatorWithActiveContextDriver.cs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Failure +{ + /// <summary> + /// Base class used by poisoning tests in which evaluator has active context but no running task yet (or already). + /// In case of evaluator failure, we expect to get Failed evaluator event, with attached information about failed context but no information about failed task. + /// This driver implements this check. + /// </summary> + internal class BasePoisonedEvaluatorWithActiveContextDriver : + BasePoisonedEvaluatorDriver, + IObserver<IFailedEvaluator> + { + protected static readonly Logger Logger = Logger.GetLogger(typeof(BasePoisonedEvaluatorWithActiveContextDriver)); + internal const string FailedEvaluatorMessage = "I have seen a failed evaluator with correct failed context and no task."; + + [Inject] + internal BasePoisonedEvaluatorWithActiveContextDriver(IEvaluatorRequestor requestor) : base(requestor) + { + } + + public void OnNext(IFailedEvaluator value) + { + if (value.FailedTask.Value != null && value.FailedTask.IsPresent()) + { + throw new Exception("Unexpected failed Task associated with failed Evaluator."); + } + + var expectedStr = "expected a single Context with Context ID " + ContextId + "."; + + if (value.FailedContexts == null) + { + throw new Exception("No Context was present but " + expectedStr); + } + + if (value.FailedContexts.Count != 1) + { + throw new Exception("Collection of failed Contexts contains " + value.FailedContexts.Count + " failed Contexts but " + expectedStr); + } + + if (value.FailedContexts[0].Id != ContextId) + { + throw new Exception("Failed Context ID " + value.FailedContexts[0].Id + ", expected " + ContextId + "."); + } + + // this log line is used for test success validation + Logger.Log(Level.Info, FailedEvaluatorMessage); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fda3ee62/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/BasePoisonedEvaluatorWithRunningTaskDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/BasePoisonedEvaluatorWithRunningTaskDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/BasePoisonedEvaluatorWithRunningTaskDriver.cs new file mode 100644 index 0000000..6bc6a5b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/BasePoisonedEvaluatorWithRunningTaskDriver.cs @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Failure +{ + /// <summary> + /// Base class used by poisoning tests in which evaluator has running task. + /// In case of evaluator failure, we expect to get Failed evaluator event, with attached information about failed task and context. + /// This driver implements this check. + /// </summary> + internal class BasePoisonedEvaluatorWithRunningTaskDriver : + BasePoisonedEvaluatorDriver, + IObserver<IFailedEvaluator> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(BasePoisonedEvaluatorWithRunningTaskDriver)); + internal const string FailedEvaluatorMessage = "I have seen a failed evaluator with correct failed context and task."; + + [Inject] + internal BasePoisonedEvaluatorWithRunningTaskDriver(IEvaluatorRequestor requestor) : base(requestor) + { + } + + public void OnNext(IFailedEvaluator value) + { + if (value.FailedTask.Value == null || !value.FailedTask.IsPresent()) + { + throw new Exception("No failed Task associated with failed Evaluator."); + } + + if (value.FailedTask.Value.Id != TaskId) + { + throw new Exception("Failed Task ID returned " + value.FailedTask.Value.Id + + ", was expecting Task ID " + TaskId); + } + + var expectedStr = "expected a single Context with Context ID " + ContextId + "."; + + if (value.FailedContexts == null) + { + throw new Exception("No Context was present but " + expectedStr); + } + + if (value.FailedContexts.Count != 1) + { + throw new Exception("Collection of failed Contexts contains " + value.FailedContexts.Count + " failed Contexts but " + expectedStr); + } + + if (value.FailedContexts[0].Id != ContextId) + { + throw new Exception("Failed Context ID " + value.FailedContexts[0].Id + ", expected " + ContextId + "."); + } + + // this log line is used for test success validation + Logger.Log(Level.Info, FailedEvaluatorMessage); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fda3ee62/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/SleepTask.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/SleepTask.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/SleepTask.cs new file mode 100644 index 0000000..eb5af2b --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/SleepTask.cs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Threading; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Common.Tasks.Events; + +namespace Org.Apache.REEF.Tests.Functional.Failure +{ + internal sealed class SleepTask : ITask, IObserver<ICloseEvent> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(SleepTask)); + private const string Prefix = "Poison: "; + + [Inject] + private SleepTask() + { + } + + public void Dispose() + { + } + + public byte[] Call(byte[] memento) + { + Logger.Log(Level.Info, Prefix + "Will sleep for 2 seconds (expecting to be poisoned faster)."); + Thread.Sleep(2000); + Logger.Log(Level.Info, Prefix + "Task sleep finished successfully."); + return null; + } + + public void OnNext(ICloseEvent value) + { + // handler for forceful shutdown in case of evaluator failure + // (to prevent throwing TaskCloseHandlerNotBoundException) + Logger.Log(Level.Info, Prefix + "Task stopped"); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fda3ee62/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithActiveContextDelayedPoison.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithActiveContextDelayedPoison.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithActiveContextDelayedPoison.cs new file mode 100644 index 0000000..6496ce2 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithActiveContextDelayedPoison.cs @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Common.Poison; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Xunit; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Events; + +namespace Org.Apache.REEF.Tests.Functional.Failure +{ + [Collection("FunctionalTests")] + public sealed class TestEvaluatorWithActiveContextDelayedPoison : ReefFunctionalTest + { + [Fact] + [Trait("Description", "Test evaluator failure by injecting delayed fault in context start handler.")] + public void TestPoisonedActiveContextHandlerWithDelay() + { + var testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurations(), typeof(PoisonedEvaluatorDriver), 1, "poisonedActiveContextWithDelayTest", "local", testFolder); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorWithActiveContextDriver.FailedEvaluatorMessage, testFolder); + + // verify that no unexpected events happened + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedClosedContext, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedFailedContext, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedCompletedTask, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedFailedTask, testFolder, 0); + CleanUp(testFolder); + } + + private static IConfiguration DriverConfigurations() + { + return DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextClosed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Build(); + } + + private sealed class PoisonedEvaluatorDriver : + BasePoisonedEvaluatorWithActiveContextDriver, + IObserver<IActiveContext> + { + [Inject] + private PoisonedEvaluatorDriver(IEvaluatorRequestor requestor) : base(requestor) + { + } + + public override void OnNext(IAllocatedEvaluator value) + { + var contextConfig = ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, ContextId) + .Set(ContextConfiguration.OnContextStart, GenericType<PoisonedEventHandler<IContextStart>>.Class) + .Build(); + + var poisonConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindIntNamedParam<CrashTimeout>("10") + .BindIntNamedParam<CrashMinDelay>("10") + .BindNamedParameter<CrashProbability, double>(GenericType<CrashProbability>.Class, "1.0") + .Build(); + + value.SubmitContext(Configurations.Merge(contextConfig, poisonConfig)); + } + + public void OnNext(IActiveContext value) + { + var taskConfig = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, TaskId) + .Set(TaskConfiguration.Task, GenericType<SleepTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<SleepTask>.Class) + .Build(); + + value.SubmitTask(taskConfig); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fda3ee62/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithActiveContextImmediatePoison.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithActiveContextImmediatePoison.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithActiveContextImmediatePoison.cs new file mode 100644 index 0000000..9e3366f --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithActiveContextImmediatePoison.cs @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Common.Poison; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Xunit; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Events; + +namespace Org.Apache.REEF.Tests.Functional.Failure +{ + [Collection("FunctionalTests")] + public sealed class TestEvaluatorWithActiveContextImmediatePoison : ReefFunctionalTest + { + [Fact] + [Trait("Description", "Test evaluator failure by injecting immediate fault in context start handler.")] + public void TestPoisonedActiveContextHandlerImmediate() + { + var testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurations(), typeof(PoisonedEvaluatorDriver), 1, "poisonedActiveContextImmediateTest", "local", testFolder); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorWithActiveContextDriver.FailedEvaluatorMessage, testFolder); + + // verify that no unexpected events happened + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedClosedContext, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedFailedContext, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedCompletedTask, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedFailedTask, testFolder, 0); + CleanUp(testFolder); + } + + private static IConfiguration DriverConfigurations() + { + return DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextClosed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Build(); + } + + private sealed class PoisonedEvaluatorDriver : + BasePoisonedEvaluatorWithActiveContextDriver, + IObserver<IActiveContext> + { + [Inject] + private PoisonedEvaluatorDriver(IEvaluatorRequestor requestor) : base(requestor) + { + } + + public override void OnNext(IAllocatedEvaluator value) + { + var contextConfig = ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, ContextId) + .Set(ContextConfiguration.OnContextStart, GenericType<PoisonedEventHandler<IContextStart>>.Class) + .Build(); + + var poisonConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindIntNamedParam<CrashTimeout>("0") + .BindIntNamedParam<CrashMinDelay>("0") + .BindNamedParameter<CrashProbability, double>(GenericType<CrashProbability>.Class, "1.0") + .Build(); + + value.SubmitContext(Configurations.Merge(contextConfig, poisonConfig)); + } + + public void OnNext(IActiveContext value) + { + var taskConfig = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, TaskId) + .Set(TaskConfiguration.Task, GenericType<SleepTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<SleepTask>.Class) + .Build(); + + value.SubmitTask(taskConfig); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fda3ee62/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithCompletedTaskDelayedPoison.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithCompletedTaskDelayedPoison.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithCompletedTaskDelayedPoison.cs new file mode 100644 index 0000000..c8f5927 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithCompletedTaskDelayedPoison.cs @@ -0,0 +1,104 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Common.Poison; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Xunit; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Driver.Task; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.Tests.Functional.Failure +{ + [Collection("FunctionalTests")] + public sealed class TestEvaluatorWithCompletedTaskDelayedPoison : ReefFunctionalTest + { + private const string ExpectedCompletedTask = "A completed task was expected."; + + [Fact] + [Trait("Description", "Test evaluator failure by injecting immediate fault in completed task handler.")] + public void TestPoisonedCompletedTaskHandlerWithDelay() + { + var testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurations(), typeof(PoisonedEvaluatorDriver), 1, "poisonedCompletedTaskWithDelayTest", "local", testFolder); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorWithActiveContextDriver.FailedEvaluatorMessage, testFolder); + ValidateMessageSuccessfullyLoggedForDriver(ExpectedCompletedTask, testFolder); + + // verify that no unexpected events happened + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedClosedContext, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedFailedContext, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedFailedTask, testFolder, 0); + CleanUp(testFolder); + } + + private static IConfiguration DriverConfigurations() + { + return DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextClosed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Build(); + } + + private sealed class PoisonedEvaluatorDriver : + BasePoisonedEvaluatorWithActiveContextDriver, + IObserver<IActiveContext> + { + [Inject] + private PoisonedEvaluatorDriver(IEvaluatorRequestor requestor) : base(requestor) + { + } + + public void OnNext(IActiveContext value) + { + var taskConfig = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, TaskId) + .Set(TaskConfiguration.Task, GenericType<SleepTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<SleepTask>.Class) + .Set(TaskConfiguration.OnTaskStop, GenericType<PoisonedEventHandler<ITaskStop>>.Class) + .Build(); + + var poisonConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindIntNamedParam<CrashTimeout>("0") + .BindIntNamedParam<CrashMinDelay>("50") + .BindNamedParameter<CrashProbability, double>(GenericType<CrashProbability>.Class, "1.0") + .Build(); + + value.SubmitTask(Configurations.Merge(taskConfig, poisonConfig)); + } + + public override void OnNext(ICompletedTask value) + { + Logger.Log(Level.Info, ExpectedCompletedTask); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fda3ee62/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithRunningTaskDelayedPoison.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithRunningTaskDelayedPoison.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithRunningTaskDelayedPoison.cs new file mode 100644 index 0000000..d14df40 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithRunningTaskDelayedPoison.cs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Common.Poison; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Failure +{ + [Collection("FunctionalTests")] + public sealed class TestEvaluatorWithRunningTaskDelayedPoison : ReefFunctionalTest + { + [Fact] + [Trait("Description", "Test evaluator failure by injecting delayed fault in task start handler.")] + public void TestPoisonedTaskStartHandlerWithDelay() + { + var testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurations(), typeof(PoisonedEvaluatorDriver), 1, "poisonedTaskStartWithDelayTest", "local", testFolder); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorWithRunningTaskDriver.FailedEvaluatorMessage, testFolder); + + // verify that no unexpected events happened + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedClosedContext, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedFailedContext, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedCompletedTask, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedFailedTask, testFolder, 0); + CleanUp(testFolder); + } + + private static IConfiguration DriverConfigurations() + { + return DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextClosed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Build(); + } + + private sealed class PoisonedEvaluatorDriver : + BasePoisonedEvaluatorWithRunningTaskDriver, + IObserver<IActiveContext> + { + [Inject] + private PoisonedEvaluatorDriver(IEvaluatorRequestor requestor) : base(requestor) + { + } + + public void OnNext(IActiveContext value) + { + var taskConfig = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, TaskId) + .Set(TaskConfiguration.Task, GenericType<SleepTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<SleepTask>.Class) + .Set(TaskConfiguration.OnTaskStart, GenericType<PoisonedEventHandler<ITaskStart>>.Class) + .Build(); + + var poisonConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindIntNamedParam<CrashTimeout>("500") + .BindIntNamedParam<CrashMinDelay>("100") + .BindNamedParameter<CrashProbability, double>(GenericType<CrashProbability>.Class, "1.0") + .Build(); + + value.SubmitTask(Configurations.Merge(taskConfig, poisonConfig)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fda3ee62/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithRunningTaskImmediatePoison.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithRunningTaskImmediatePoison.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithRunningTaskImmediatePoison.cs new file mode 100644 index 0000000..0b2b513 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Failure/TestEvaluatorWithRunningTaskImmediatePoison.cs @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Common.Poison; +using Org.Apache.REEF.Common.Tasks.Events; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Implementations.Configuration; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.Failure +{ + [Collection("FunctionalTests")] + public sealed class TestEvaluatorWithRunningTaskImmediatePoison : ReefFunctionalTest + { + [Fact] + [Trait("Description", "Test evaluator failure by injecting immediate fault in task start handler.")] + public void TestPoisonedTaskStartHandlerImmediate() + { + var testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurations(), typeof(PoisonedEvaluatorDriver), 1, "poisonedTaskStartImmediateTest", "local", testFolder); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorWithRunningTaskDriver.FailedEvaluatorMessage, testFolder); + + // verify that no unexpected events happened + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedClosedContext, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedFailedContext, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedCompletedTask, testFolder, 0); + ValidateMessageSuccessfullyLoggedForDriver(BasePoisonedEvaluatorDriver.UnexpectedFailedTask, testFolder, 0); + CleanUp(testFolder); + } + + private static IConfiguration DriverConfigurations() + { + return DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextClosed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnTaskCompleted, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnTaskFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Build(); + } + + private sealed class PoisonedEvaluatorDriver : + BasePoisonedEvaluatorWithRunningTaskDriver, + IObserver<IActiveContext> + { + [Inject] + private PoisonedEvaluatorDriver(IEvaluatorRequestor requestor) : base(requestor) + { + } + + public void OnNext(IActiveContext value) + { + var taskConfig = TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, TaskId) + .Set(TaskConfiguration.Task, GenericType<SleepTask>.Class) + .Set(TaskConfiguration.OnClose, GenericType<SleepTask>.Class) + .Set(TaskConfiguration.OnTaskStart, GenericType<PoisonedEventHandler<ITaskStart>>.Class) + .Build(); + + var poisonConfig = TangFactory.GetTang().NewConfigurationBuilder() + .BindIntNamedParam<CrashTimeout>("0") + .BindIntNamedParam<CrashMinDelay>("0") + .BindNamedParameter<CrashProbability, double>(GenericType<CrashProbability>.Class, "1.0") + .Build(); + + value.SubmitTask(Configurations.Merge(taskConfig, poisonConfig)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/fda3ee62/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs deleted file mode 100644 index b0d1c40..0000000 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs +++ /dev/null @@ -1,186 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -using System; -using System.Linq; -using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Driver; -using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Driver.Evaluator; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Utilities.Logging; -using Xunit; -using System.Threading; -using Org.Apache.REEF.Common.Context; -using Org.Apache.REEF.Common.Poison; -using Org.Apache.REEF.Common.Tasks.Events; -using Org.Apache.REEF.Driver.Task; -using Org.Apache.REEF.Tang.Implementations.Tang; -using Org.Apache.REEF.Tang.Implementations.Configuration; - -namespace Org.Apache.REEF.Tests.Functional.FaultTolerant -{ - [Collection("FunctionalTests")] - public sealed class PoisonTest : ReefFunctionalTest - { - private static readonly Logger Logger = Logger.GetLogger(typeof(PoisonTest)); - - private const string Prefix = "Poison: "; - private const string FailedEvaluatorMessage = "I have succeeded in seeing a failed evaluator."; - private const string TaskId = "1234567"; - private const string ContextId = "ContextID"; - - [Fact] - [Trait("Description", "Test Poison functionality by injecting fault in context start handler.")] - public void TestPoisonedEvaluatorStartHandler() - { - string testFolder = DefaultRuntimeFolder + TestId; - TestRun(DriverConfigurations(), typeof(PoisonedEvaluatorDriver), 1, "poisonedEvaluatorStartTest", "local", testFolder); - ValidateMessageSuccessfullyLoggedForDriver(FailedEvaluatorMessage, testFolder); - CleanUp(testFolder); - } - - public IConfiguration DriverConfigurations() - { - return DriverConfiguration.ConfigurationModule - .Set(DriverConfiguration.OnDriverStarted, GenericType<PoisonedEvaluatorDriver>.Class) - .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<PoisonedEvaluatorDriver>.Class) - .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<PoisonedEvaluatorDriver>.Class) - .Set(DriverConfiguration.OnContextActive, GenericType<PoisonedEvaluatorDriver>.Class) - .Set(DriverConfiguration.OnTaskCompleted, GenericType<PoisonedEvaluatorDriver>.Class) - .Build(); - } - - private sealed class PoisonedEvaluatorDriver : - IObserver<IDriverStarted>, - IObserver<IAllocatedEvaluator>, - IObserver<IActiveContext>, - IObserver<IFailedEvaluator>, - IObserver<ICompletedTask> - { - private readonly IEvaluatorRequestor _requestor; - - [Inject] - private PoisonedEvaluatorDriver(IEvaluatorRequestor requestor) - { - _requestor = requestor; - } - - public void OnNext(IDriverStarted value) - { - _requestor.Submit(_requestor.NewBuilder().Build()); - } - - public void OnNext(IAllocatedEvaluator value) - { - value.SubmitContext(ContextConfiguration.ConfigurationModule - .Set(ContextConfiguration.Identifier, ContextId) - .Build()); - } - - public void OnNext(IActiveContext value) - { - var taskConfig = TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, TaskId) - .Set(TaskConfiguration.Task, GenericType<SleepTask>.Class) - .Set(TaskConfiguration.OnTaskStart, GenericType<PoisonedEventHandler<ITaskStart>>.Class) - .Build(); - - var poisonConfig = TangFactory.GetTang().NewConfigurationBuilder() - .BindIntNamedParam<CrashTimeout>("500") - .BindIntNamedParam<CrashMinDelay>("100") - .BindNamedParameter<CrashProbability, double>(GenericType<CrashProbability>.Class, "1.0") - .Build(); - - value.SubmitTask(Configurations.Merge(taskConfig, poisonConfig)); - } - - public void OnNext(IFailedEvaluator value) - { - Logger.Log(Level.Error, FailedEvaluatorMessage); - if (value.FailedTask.Value == null || !value.FailedTask.IsPresent()) - { - throw new Exception("No failed Task associated with failed Evaluator"); - } - - if (value.FailedTask.Value.Id != TaskId) - { - throw new Exception("Failed Task ID returned " + value.FailedTask.Value.Id - + ", was expecting Task ID " + TaskId); - } - - Logger.Log(Level.Info, "Received all expected failed Tasks."); - - const string expectedStr = "expected a single Context with Context ID " + ContextId + "."; - - if (value.FailedContexts == null) - { - throw new Exception("No Context was present but " + expectedStr); - } - - if (value.FailedContexts.Count != 1) - { - throw new Exception("Collection of failed Contexts contains " + value.FailedContexts.Count + " failed Contexts but only " + expectedStr); - } - - if (!value.FailedContexts.Select(ctx => ctx.Id).Contains(ContextId)) - { - throw new Exception("Collection of failed Contexts does not contain expected Context ID " + ContextId + "."); - } - - Logger.Log(Level.Info, "Received all expected failed Contexts."); - } - - public void OnNext(ICompletedTask value) - { - throw new Exception("A completed task was not expected."); - } - - public void OnError(Exception error) - { - throw new NotImplementedException(); - } - - public void OnCompleted() - { - throw new NotImplementedException(); - } - } - - private sealed class SleepTask : ITask - { - [Inject] - private SleepTask() - { - } - - public void Dispose() - { - } - - public byte[] Call(byte[] memento) - { - Logger.Log(Level.Info, Prefix + "Will sleep for 2 seconds (expecting to be poisoned faster)."); - Thread.Sleep(2000); - Logger.Log(Level.Info, Prefix + "Task sleep finished successfully."); - return null; - } - } - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/fda3ee62/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs index ad38652..90487e0 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ReefFunctionalTest.cs @@ -197,16 +197,16 @@ namespace Org.Apache.REEF.Tests.Functional if (numberOfOccurrences > 0) { Assert.True(numberOfOccurrences == successIndicators.Count(), - "Expected number of message occurrences " + numberOfOccurrences + " differs from actual " + successIndicators.Count()); + "Expected number of message \"" + message + "\" occurrences " + numberOfOccurrences + " differs from actual " + successIndicators.Count()); } else if (numberOfOccurrences == 0) { Assert.True(0 == successIndicators.Count(), - "Message not expected to occur but occurs " + successIndicators.Count() + " times"); + "Message \"" + message + "\" not expected to occur but occurs " + successIndicators.Count() + " times"); } else { - Assert.True(successIndicators.Count() > 0, "Message expected to occur, but did not."); + Assert.True(successIndicators.Count() > 0, "Message \"" + message + "\" expected to occur, but did not."); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/fda3ee62/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 9f3a8f5..bf58810 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -84,6 +84,15 @@ under the License. <Compile Include="Functional\Bridge\TestSuspendTask.cs" /> <Compile Include="Functional\Bridge\TestUnhandledTaskException.cs" /> <Compile Include="Functional\Driver\DriverTestStartHandler.cs" /> + <Compile Include="Functional\Failure\BasePoisonedEvaluatorWithActiveContextDriver.cs" /> + <Compile Include="Functional\Failure\BasePoisonedEvaluatorWithRunningTaskDriver.cs" /> + <Compile Include="Functional\Failure\BasePoisonedEvaluatorDriver.cs" /> + <Compile Include="Functional\Failure\TestEvaluatorWithActiveContextDelayedPoison.cs" /> + <Compile Include="Functional\Failure\TestEvaluatorWithCompletedTaskDelayedPoison.cs" /> + <Compile Include="Functional\Failure\TestEvaluatorWithRunningTaskDelayedPoison.cs" /> + <Compile Include="Functional\Failure\TestEvaluatorWithActiveContextImmediatePoison.cs" /> + <Compile Include="Functional\Failure\TestEvaluatorWithRunningTaskImmediatePoison.cs" /> + <Compile Include="Functional\Failure\SleepTask.cs" /> <Compile Include="Functional\FaultTolerant\TestContextStart.cs" /> <Compile Include="Functional\FaultTolerant\TestResubmitEvaluator.cs" /> <Compile Include="Functional\FaultTolerant\TestResubmitTask.cs" /> @@ -94,7 +103,6 @@ under the License. <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriverSpecifyingRuntimeName.cs" /> <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriver.cs" /> <Compile Include="Functional\Driver\TestDriver.cs" /> - <Compile Include="Functional\FaultTolerant\PoisonTest.cs" /> <Compile Include="Functional\Messaging\MessageDriver.cs" /> <Compile Include="Functional\Messaging\MessageTask.cs" /> <Compile Include="Functional\Messaging\TestTaskMessage.cs" />
