Repository: reef Updated Branches: refs/heads/master f74a7e8b1 -> 97f9eca4b
[REEF-1249] Add REEF Poison to REEF.NET This change: * adds REEF Poison - simple fault-injection event handler * adds a test for Poison functionality JIRA: [REEF-1249](https://issues.apache.org/jira/browse/REEF-1249) Pull request: This closes #922 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/97f9eca4 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/97f9eca4 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/97f9eca4 Branch: refs/heads/master Commit: 97f9eca4b406512cc4a423e947f123684b304e5d Parents: f74a7e8 Author: Mariia Mykhailova <[email protected]> Authored: Tue Mar 15 14:57:48 2016 -0700 Committer: Julia Wang <[email protected]> Committed: Tue Apr 5 17:06:35 2016 -0700 ---------------------------------------------------------------------- .../Org.Apache.REEF.Common.csproj | 1 + .../Poison/PoisonedEventHandler.cs | 125 ++++++++++++++++ .../Functional/FaultTolerant/PoisonTest.cs | 143 +++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 1 + .../Time/Runtime/RuntimeClock.cs | 8 +- 5 files changed, 277 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/97f9eca4/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index b897f89..9a935c1 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -117,6 +117,7 @@ under the License. <Compile Include="Jar\ResourceHelper.cs" /> <Compile Include="ITaskSubmittable.cs" /> <Compile Include="Client\Parameters\DriverConfigurationProviders.cs" /> + <Compile Include="Poison\PoisonedEventHandler.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Protobuf\ReefProtocol\ClientRuntime.pb.cs"> <ExcludeFromStyleCop>true</ExcludeFromStyleCop> http://git-wip-us.apache.org/repos/asf/reef/blob/97f9eca4/lang/cs/Org.Apache.REEF.Common/Poison/PoisonedEventHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Poison/PoisonedEventHandler.cs b/lang/cs/Org.Apache.REEF.Common/Poison/PoisonedEventHandler.cs new file mode 100644 index 0000000..4ab6722 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Poison/PoisonedEventHandler.cs @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Attributes; +using Org.Apache.REEF.Utilities.Logging; +using Org.Apache.REEF.Wake.Time.Event; +using Org.Apache.REEF.Wake.Time.Runtime; +using System; +using System.Reactive; +using System.Threading.Tasks; + +namespace Org.Apache.REEF.Common.Poison +{ + /// <summary> + /// Handler to process an event in a way that has certain probability of failure within certain inverval of time. + /// </summary> + /// <typeparam name="T">The type of event</typeparam> + [Private] + public class PoisonedEventHandler<T> : IObserver<T> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(PoisonedEventHandler<T>)); + + private readonly double _crashProbability; + private readonly int _crashTimeout; + private readonly int _crashMinDelay; + private readonly RuntimeClock _clock; + + private readonly Random _rand = new Random(); + + [Inject] + private PoisonedEventHandler( + [Parameter(typeof(CrashProbability))] double crashProbability, + [Parameter(typeof(CrashTimeout))] int crashTimeout, + [Parameter(typeof(CrashMinDelay))] int crashMinDelay, + RuntimeClock clock) + { + _crashProbability = crashProbability; + _crashTimeout = crashTimeout; + _crashMinDelay = crashMinDelay; + _clock = clock; + } + + /// <summary> + /// Throws a PoisonException with probability CrashProbability between time CrashMinDelay and CrashMinDelay + CrashTimeout. + /// </summary> + public void OnNext(T value) + { + Logger.Log(Level.Verbose, "Poisoned handler for {0}", typeof(T).FullName); + if (_rand.NextDouble() <= _crashProbability) + { + int timeToCrash = _rand.Next(_crashTimeout) + _crashMinDelay; + Logger.Log(Level.Info, "Poisoning successful, crashing in {0} msec.", timeToCrash); + if (timeToCrash == 0) + { + throw new PoisonException("Crashed at " + DateTime.Now); + } + IObserver<Alarm> poisonedAlarm = Observer.Create<Alarm>( + x => + { + Logger.Log(Level.Verbose, "Alarm firing"); + throw new PoisonException("Crashed at " + DateTime.Now); + }); + _clock.ScheduleAlarm(timeToCrash, poisonedAlarm); + } + else + { + Logger.Log(Level.Info, "No poisoning happens"); + } + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + /// <summary> + /// Exception thrown by PoisonedEventHandler. + /// </summary> + [Private] + public class PoisonException : Exception + { + public PoisonException(string s) : base(s) + { + } + } + + [Private] + [NamedParameter("The probability with which a crash will occur", "CrashProbability", "0.5")] + public class CrashProbability : Name<double> + { + } + + [Private] + [NamedParameter("The time window (in msec) after crash delay completes in which the crash will occur", "CrashTimeout", "1000")] + public class CrashTimeout : Name<int> + { + } + + [Private] + [NamedParameter("The time period (in msec) after event in which the crash is guaranteed to not occur", "CrashMinDelay", "0")] + public class CrashMinDelay : Name<int> + { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/97f9eca4/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs new file mode 100644 index 0000000..72df68c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Tasks; +using Org.Apache.REEF.Driver; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; +using System.Threading; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Events; +using Org.Apache.REEF.Common.Poison; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Implementations.Configuration; + +namespace Org.Apache.REEF.Tests.Functional.FaultTolerant +{ + [Collection("FunctionalTests")] + public sealed class PoisonTest : ReefFunctionalTest + { + private static readonly Logger Logger = Logger.GetLogger(typeof(PoisonTest)); + + private const string FailedEvaluatorMessage = "I have succeeded in seeing a failed evaluator."; + private const string TaskId = "1234567"; + + [Fact] + [Trait("Description", "Test Poison functionality by injecting fault in context start handler.")] + public void TestPoisonedEvaluatorStartHanlder() + { + string testFolder = DefaultRuntimeFolder + TestId; + TestRun(DriverConfigurations(), typeof(PoisonedEvaluatorDriver), 1, "poisonedEvaluatorStartTest", "local", testFolder); + ValidateMessageSuccessfullyLoggedForDriver(FailedEvaluatorMessage, testFolder); + CleanUp(testFolder); + } + + public IConfiguration DriverConfigurations() + { + return DriverConfiguration.ConfigurationModule + .Set(DriverConfiguration.OnDriverStarted, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnEvaluatorFailed, GenericType<PoisonedEvaluatorDriver>.Class) + .Set(DriverConfiguration.OnContextActive, GenericType<PoisonedEvaluatorDriver>.Class) + .Build(); + } + + private sealed class PoisonedEvaluatorDriver : + IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IActiveContext>, + IObserver<IFailedEvaluator> + { + private readonly IEvaluatorRequestor _requestor; + + [Inject] + private PoisonedEvaluatorDriver(IEvaluatorRequestor requestor) + { + _requestor = requestor; + } + + public void OnNext(IDriverStarted value) + { + _requestor.Submit(_requestor.NewBuilder().Build()); + } + + public void OnNext(IAllocatedEvaluator value) + { + var c1 = ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, "ContextID") + .Set(ContextConfiguration.OnContextStart, GenericType<PoisonedEventHandler<IContextStart>>.Class) + .Build(); + + var c2 = TangFactory.GetTang().NewConfigurationBuilder() + .BindIntNamedParam<CrashTimeout>("500") + .BindIntNamedParam<CrashMinDelay>("100") + .BindNamedParameter<CrashProbability, double>(GenericType<CrashProbability>.Class, "1.0") + .Build(); + + value.SubmitContext(Configurations.Merge(c1, c2)); + } + + public void OnNext(IActiveContext value) + { + value.SubmitTask(TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, TaskId) + .Set(TaskConfiguration.Task, GenericType<SleepTask>.Class) + .Build()); + } + + public void OnNext(IFailedEvaluator value) + { + Logger.Log(Level.Error, FailedEvaluatorMessage); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + + private sealed class SleepTask : ITask + { + [Inject] + private SleepTask() + { + } + + public void Dispose() + { + } + + public byte[] Call(byte[] memento) + { + Logger.Log(Level.Verbose, "Will sleep for 2 seconds (expecting to be poisoned faster)."); + Thread.Sleep(2000); + return null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/97f9eca4/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 5a36bb1..6c8f235 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -86,6 +86,7 @@ under the License. <Compile Include="Functional\FaultTolerant\TestContextStart.cs" /> <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriver.cs" /> <Compile Include="Functional\Driver\TestDriver.cs" /> + <Compile Include="Functional\FaultTolerant\PoisonTest.cs" /> <Compile Include="Functional\Messaging\MessageDriver.cs" /> <Compile Include="Functional\Messaging\MessageTask.cs" /> <Compile Include="Functional\Messaging\TestTaskMessage.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/97f9eca4/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs index 6404796..ae198fb 100644 --- a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs +++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs @@ -34,6 +34,7 @@ namespace Org.Apache.REEF.Wake.Time.Runtime { private static readonly Logger LOGGER = Logger.GetLogger(typeof(RuntimeClock)); + private static int NumberOfInstantiations = 0; private readonly ITimer _timer; private readonly PubSubSubject<Time> _handlers; private readonly PriorityQueue<Time> _schedule; @@ -73,6 +74,11 @@ namespace Org.Apache.REEF.Wake.Time.Runtime _runtimeStartHandler = runtimeStartHandler; _runtimeStopHandler = runtimeStopHandler; _idleHandler = idleHandler; + ++NumberOfInstantiations; + if (NumberOfInstantiations > 1) + { + LOGGER.Log(Level.Warning, "Instantiated `RuntimeClock` instance number " + NumberOfInstantiations); + } } public IInjectionFuture<ISet<IObserver<RuntimeStart>>> InjectedRuntimeStartHandler @@ -90,7 +96,7 @@ namespace Org.Apache.REEF.Wake.Time.Runtime /// <summary> /// Schedule a TimerEvent at the given future offset /// </summary> - /// <param name="offset">The offset in the future to schedule the alarm</param> + /// <param name="offset">The offset in the future to schedule the alarm, in msec</param> /// <param name="handler">The IObserver to to be called</param> public override void ScheduleAlarm(long offset, IObserver<Alarm> handler) {
