Repository: reef
Updated Branches:
  refs/heads/master f74a7e8b1 -> 97f9eca4b


[REEF-1249] Add REEF Poison to REEF.NET

This change:
 * adds REEF Poison - simple fault-injection event handler
 * adds a test for Poison functionality

JIRA:
  [REEF-1249](https://issues.apache.org/jira/browse/REEF-1249)

Pull request:
  This closes #922


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/97f9eca4
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/97f9eca4
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/97f9eca4

Branch: refs/heads/master
Commit: 97f9eca4b406512cc4a423e947f123684b304e5d
Parents: f74a7e8
Author: Mariia Mykhailova <[email protected]>
Authored: Tue Mar 15 14:57:48 2016 -0700
Committer: Julia Wang <[email protected]>
Committed: Tue Apr 5 17:06:35 2016 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Common.csproj               |   1 +
 .../Poison/PoisonedEventHandler.cs              | 125 ++++++++++++++++
 .../Functional/FaultTolerant/PoisonTest.cs      | 143 +++++++++++++++++++
 .../Org.Apache.REEF.Tests.csproj                |   1 +
 .../Time/Runtime/RuntimeClock.cs                |   8 +-
 5 files changed, 277 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/97f9eca4/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj 
b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
index b897f89..9a935c1 100644
--- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
+++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj
@@ -117,6 +117,7 @@ under the License.
     <Compile Include="Jar\ResourceHelper.cs" />
     <Compile Include="ITaskSubmittable.cs" />
     <Compile Include="Client\Parameters\DriverConfigurationProviders.cs" />
+    <Compile Include="Poison\PoisonedEventHandler.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="Protobuf\ReefProtocol\ClientRuntime.pb.cs">
       <ExcludeFromStyleCop>true</ExcludeFromStyleCop>

http://git-wip-us.apache.org/repos/asf/reef/blob/97f9eca4/lang/cs/Org.Apache.REEF.Common/Poison/PoisonedEventHandler.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Common/Poison/PoisonedEventHandler.cs 
b/lang/cs/Org.Apache.REEF.Common/Poison/PoisonedEventHandler.cs
new file mode 100644
index 0000000..4ab6722
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Common/Poison/PoisonedEventHandler.cs
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Attributes;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Time.Event;
+using Org.Apache.REEF.Wake.Time.Runtime;
+using System;
+using System.Reactive;
+using System.Threading.Tasks;
+
+namespace Org.Apache.REEF.Common.Poison
+{
+    /// <summary>
+    /// Handler to process an event in a way that has certain probability of 
failure within certain inverval of time.
+    /// </summary>
+    /// <typeparam name="T">The type of event</typeparam>
+    [Private]
+    public class PoisonedEventHandler<T> : IObserver<T>
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(PoisonedEventHandler<T>));
+
+        private readonly double _crashProbability;
+        private readonly int _crashTimeout;
+        private readonly int _crashMinDelay;
+        private readonly RuntimeClock _clock;
+
+        private readonly Random _rand = new Random();
+
+        [Inject]
+        private PoisonedEventHandler(
+            [Parameter(typeof(CrashProbability))] double crashProbability,
+            [Parameter(typeof(CrashTimeout))] int crashTimeout,
+            [Parameter(typeof(CrashMinDelay))] int crashMinDelay,
+            RuntimeClock clock) 
+        {
+            _crashProbability = crashProbability;
+            _crashTimeout = crashTimeout;
+            _crashMinDelay = crashMinDelay;
+            _clock = clock;
+        }
+
+        /// <summary>
+        /// Throws a PoisonException with probability CrashProbability between 
time CrashMinDelay and CrashMinDelay + CrashTimeout.
+        /// </summary>
+        public void OnNext(T value)
+        {
+            Logger.Log(Level.Verbose, "Poisoned handler for {0}", 
typeof(T).FullName);
+            if (_rand.NextDouble() <= _crashProbability)
+            {
+                int timeToCrash = _rand.Next(_crashTimeout) + _crashMinDelay;
+                Logger.Log(Level.Info, "Poisoning successful, crashing in {0} 
msec.", timeToCrash);
+                if (timeToCrash == 0)
+                {
+                    throw new PoisonException("Crashed at " + DateTime.Now);
+                }
+                IObserver<Alarm> poisonedAlarm = Observer.Create<Alarm>(
+                    x =>
+                    {
+                        Logger.Log(Level.Verbose, "Alarm firing");
+                        throw new PoisonException("Crashed at " + 
DateTime.Now);
+                    });
+                _clock.ScheduleAlarm(timeToCrash, poisonedAlarm);
+            }
+            else
+            {
+                Logger.Log(Level.Info, "No poisoning happens");
+            }
+        }
+
+        public void OnError(Exception error)
+        {
+            throw new NotImplementedException();
+        }
+
+        public void OnCompleted()
+        {
+            throw new NotImplementedException();
+        }
+    }
+
+    /// <summary>
+    /// Exception thrown by PoisonedEventHandler.
+    /// </summary>
+    [Private]
+    public class PoisonException : Exception
+    {
+        public PoisonException(string s) : base(s)
+        {
+        }
+    }
+
+    [Private]
+    [NamedParameter("The probability with which a crash will occur", 
"CrashProbability", "0.5")]
+    public class CrashProbability : Name<double>
+    {
+    }
+
+    [Private]
+    [NamedParameter("The time window (in msec) after crash delay completes in 
which the crash will occur", "CrashTimeout", "1000")]
+    public class CrashTimeout : Name<int>
+    {
+    }
+
+    [Private]
+    [NamedParameter("The time period (in msec) after event in which the crash 
is guaranteed to not occur", "CrashMinDelay", "0")]
+    public class CrashMinDelay : Name<int>
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/97f9eca4/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs
new file mode 100644
index 0000000..72df68c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/PoisonTest.cs
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+using System.Threading;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Events;
+using Org.Apache.REEF.Common.Poison;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+
+namespace Org.Apache.REEF.Tests.Functional.FaultTolerant
+{
+    [Collection("FunctionalTests")]
+    public sealed class PoisonTest : ReefFunctionalTest
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(PoisonTest));
+
+        private const string FailedEvaluatorMessage = "I have succeeded in 
seeing a failed evaluator.";
+        private const string TaskId = "1234567";
+
+        [Fact]
+        [Trait("Description", "Test Poison functionality by injecting fault in 
context start handler.")]
+        public void TestPoisonedEvaluatorStartHanlder()
+        {
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestRun(DriverConfigurations(), typeof(PoisonedEvaluatorDriver), 
1, "poisonedEvaluatorStartTest", "local", testFolder);
+            ValidateMessageSuccessfullyLoggedForDriver(FailedEvaluatorMessage, 
testFolder);
+            CleanUp(testFolder);
+        }
+
+        public IConfiguration DriverConfigurations()
+        {
+            return DriverConfiguration.ConfigurationModule
+                .Set(DriverConfiguration.OnDriverStarted, 
GenericType<PoisonedEvaluatorDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<PoisonedEvaluatorDriver>.Class)
+                .Set(DriverConfiguration.OnEvaluatorFailed, 
GenericType<PoisonedEvaluatorDriver>.Class)
+                .Set(DriverConfiguration.OnContextActive, 
GenericType<PoisonedEvaluatorDriver>.Class)
+                .Build();
+        }
+
+        private sealed class PoisonedEvaluatorDriver :
+            IObserver<IDriverStarted>,
+            IObserver<IAllocatedEvaluator>,
+            IObserver<IActiveContext>,
+            IObserver<IFailedEvaluator>
+        {
+            private readonly IEvaluatorRequestor _requestor;
+
+            [Inject]
+            private PoisonedEvaluatorDriver(IEvaluatorRequestor requestor)
+            {
+                _requestor = requestor;
+            }
+
+            public void OnNext(IDriverStarted value)
+            {
+                _requestor.Submit(_requestor.NewBuilder().Build());
+            }
+
+            public void OnNext(IAllocatedEvaluator value)
+            {
+                var c1 = ContextConfiguration.ConfigurationModule
+                    .Set(ContextConfiguration.Identifier, "ContextID")
+                    .Set(ContextConfiguration.OnContextStart, 
GenericType<PoisonedEventHandler<IContextStart>>.Class)
+                    .Build();
+
+                var c2 = TangFactory.GetTang().NewConfigurationBuilder()
+                    .BindIntNamedParam<CrashTimeout>("500")
+                    .BindIntNamedParam<CrashMinDelay>("100")
+                    .BindNamedParameter<CrashProbability, 
double>(GenericType<CrashProbability>.Class, "1.0")
+                    .Build();
+
+                value.SubmitContext(Configurations.Merge(c1, c2));
+            }
+
+            public void OnNext(IActiveContext value)
+            {
+                value.SubmitTask(TaskConfiguration.ConfigurationModule
+                    .Set(TaskConfiguration.Identifier, TaskId)
+                    .Set(TaskConfiguration.Task, GenericType<SleepTask>.Class)
+                    .Build());
+            }
+
+            public void OnNext(IFailedEvaluator value)
+            {
+                Logger.Log(Level.Error, FailedEvaluatorMessage);
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+
+        private sealed class SleepTask : ITask
+        {
+            [Inject]
+            private SleepTask()
+            {
+            }
+
+            public void Dispose()
+            {
+            }
+
+            public byte[] Call(byte[] memento)
+            {
+                Logger.Log(Level.Verbose, "Will sleep for 2 seconds (expecting 
to be poisoned faster).");
+                Thread.Sleep(2000);
+                return null;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/97f9eca4/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 5a36bb1..6c8f235 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -86,6 +86,7 @@ under the License.
     <Compile Include="Functional\FaultTolerant\TestContextStart.cs" />
     <Compile Include="Functional\RuntimeName\EvaluatorRequestingDriver.cs" />
     <Compile Include="Functional\Driver\TestDriver.cs" />
+    <Compile Include="Functional\FaultTolerant\PoisonTest.cs" />
     <Compile Include="Functional\Messaging\MessageDriver.cs" />
     <Compile Include="Functional\Messaging\MessageTask.cs" />
     <Compile Include="Functional\Messaging\TestTaskMessage.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/97f9eca4/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs 
b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs
index 6404796..ae198fb 100644
--- a/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs
+++ b/lang/cs/Org.Apache.REEF.Wake/Time/Runtime/RuntimeClock.cs
@@ -34,6 +34,7 @@ namespace Org.Apache.REEF.Wake.Time.Runtime
     {
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(RuntimeClock));
 
+        private static int NumberOfInstantiations = 0;
         private readonly ITimer _timer;
         private readonly PubSubSubject<Time> _handlers;
         private readonly PriorityQueue<Time> _schedule;
@@ -73,6 +74,11 @@ namespace Org.Apache.REEF.Wake.Time.Runtime
             _runtimeStartHandler = runtimeStartHandler;
             _runtimeStopHandler = runtimeStopHandler;
             _idleHandler = idleHandler;
+            ++NumberOfInstantiations;
+            if (NumberOfInstantiations > 1)
+            {
+                LOGGER.Log(Level.Warning, "Instantiated `RuntimeClock` 
instance number " + NumberOfInstantiations);
+            }
         }
 
         public IInjectionFuture<ISet<IObserver<RuntimeStart>>> 
InjectedRuntimeStartHandler
@@ -90,7 +96,7 @@ namespace Org.Apache.REEF.Wake.Time.Runtime
         /// <summary>
         /// Schedule a TimerEvent at the given future offset
         /// </summary>
-        /// <param name="offset">The offset in the future to schedule the 
alarm</param>
+        /// <param name="offset">The offset in the future to schedule the 
alarm, in msec</param>
         /// <param name="handler">The IObserver to to be called</param>
         public override void ScheduleAlarm(long offset, IObserver<Alarm> 
handler)
         {

Reply via email to