Repository: reef
Updated Branches:
  refs/heads/master ec7bc969f -> dd724ffdc


[REEF-1146] Support Context Events

This addressed the issue by
  * Support ContextStart and ContextStop.
  * Set up initial structure to support ContextMessages and messages from 
Driver to Context.

JIRA:
  [REEF-1146](https://issues.apache.org/jira/browse/REEF-1146)

Pull Request:
  This closes #816


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/dd724ffd
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/dd724ffd
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/dd724ffd

Branch: refs/heads/master
Commit: dd724ffdcfc93c911392f29e848674cfeef5a7f3
Parents: ec7bc96
Author: Andrew Chung <[email protected]>
Authored: Mon Feb 1 15:07:56 2016 -0800
Committer: Markus Weimer <[email protected]>
Committed: Wed Feb 3 21:11:45 2016 -0800

----------------------------------------------------------------------
 .../Defaults/DefaultContextMessageSource.cs     |   6 ++
 .../Defaults/DefaultContextStartHandler.cs      |   6 ++
 .../Defaults/DefaultContextStopHandler.cs       |   6 ++
 .../Evaluator/Context/ContextLifeCycle.cs       |  64 +++++++-----
 .../Runtime/Evaluator/Context/ContextRuntime.cs |   2 +-
 .../ContextRuntimeTests.cs                      | 103 +++++++++++++++++++
 .../Org.Apache.REEF.Evaluator.Tests.csproj      |   3 +-
 7 files changed, 160 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextMessageSource.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextMessageSource.cs
 
b/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextMessageSource.cs
index 67c5ed0..ca701ea 100644
--- 
a/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextMessageSource.cs
+++ 
b/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextMessageSource.cs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities;
 
 namespace Org.Apache.REEF.Common.Context.Defaults
@@ -24,6 +25,11 @@ namespace Org.Apache.REEF.Common.Context.Defaults
     /// </summary>
     public class DefaultContextMessageSource : IContextMessageSource
     {
+        [Inject]
+        private DefaultContextMessageSource()
+        {
+        }
+
         public Optional<ContextMessage> Message
         {
             get

http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStartHandler.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStartHandler.cs 
b/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStartHandler.cs
index 01422d1..5db4bac 100644
--- 
a/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStartHandler.cs
+++ 
b/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStartHandler.cs
@@ -17,6 +17,7 @@
 
 using System;
 using Org.Apache.REEF.Common.Events;
+using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Common.Context.Defaults
@@ -28,6 +29,11 @@ namespace Org.Apache.REEF.Common.Context.Defaults
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(DefaultContextStartHandler));
 
+        [Inject]
+        private DefaultContextStartHandler()
+        {
+        }
+
         public void OnNext(IContextStart contextStart)
         {
             Logger.Log(Level.Info, "DefaultContextStartHandler received for 
context: " + contextStart.Id);

http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStopHandler.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStopHandler.cs 
b/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStopHandler.cs
index 8688314..c609eff 100644
--- 
a/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStopHandler.cs
+++ 
b/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStopHandler.cs
@@ -17,6 +17,7 @@
 
 using System;
 using Org.Apache.REEF.Common.Events;
+using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Common.Context.Defaults
@@ -28,6 +29,11 @@ namespace Org.Apache.REEF.Common.Context.Defaults
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(DefaultContextStopHandler));
 
+        [Inject]
+        private DefaultContextStopHandler()
+        {
+        }
+
         public void OnNext(IContextStop contextStop)
         {
             Logger.Log(Level.Info, "DefaultContextStopHandler received for 
context: " + contextStop.Id);

http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextLifeCycle.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextLifeCycle.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextLifeCycle.cs
index a987d28..46def9b 100644
--- 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextLifeCycle.cs
+++ 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextLifeCycle.cs
@@ -28,13 +28,12 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
     /// </summary>
     internal sealed class ContextLifeCycle
     {
-        private HashSet<IObserver<IContextStart>> _contextStartHandlers;
+        private readonly ISet<IObserver<IContextStart>> _contextStartHandlers;
+        private readonly ISet<IObserver<IContextStop>> _contextStopHandlers;
+        private readonly ISet<IContextMessageSource> _contextMessageSources;
+        private readonly ISet<IContextMessageHandler> _contextMessageHandlers;
 
-        private HashSet<IObserver<IContextStop>> _contextStopHandlers;
-
-        private readonly HashSet<IContextMessageSource> _contextMessageSources;
-
-        // TODO[JIRA REEF-1167]: Make method private.
+        // TODO[JIRA REEF-1167]: Remove constructor..
         [Inject]
         public 
ContextLifeCycle([Parameter(typeof(ContextConfigurationOptions.ContextIdentifier))]
 string contextId)
         {
@@ -42,11 +41,27 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
             _contextStartHandlers = new HashSet<IObserver<IContextStart>>();
             _contextStopHandlers = new HashSet<IObserver<IContextStop>>();
             _contextMessageSources = new HashSet<IContextMessageSource>();
+            _contextMessageHandlers = new HashSet<IContextMessageHandler>();
+        }
+
+        [Inject]
+        private ContextLifeCycle(
+            [Parameter(typeof(ContextConfigurationOptions.ContextIdentifier))] 
string contextId,
+            [Parameter(typeof(ContextConfigurationOptions.StartHandlers))] 
ISet<IObserver<IContextStart>> contextStartHandlers,
+            [Parameter(typeof(ContextConfigurationOptions.StopHandlers))] 
ISet<IObserver<IContextStop>> contextStopHandlers,
+            
[Parameter(typeof(ContextConfigurationOptions.ContextMessageSources))] 
ISet<IContextMessageSource> contextMessageSources,
+            
[Parameter(typeof(ContextConfigurationOptions.ContextMessageHandlers))] 
ISet<IContextMessageHandler> contextMessageHandlers)
+        {
+            Id = contextId;
+            _contextStartHandlers = contextStartHandlers;
+            _contextStopHandlers = contextStopHandlers;
+            _contextMessageSources = contextMessageSources;
+            _contextMessageHandlers = contextMessageHandlers;
         }
 
         public string Id { get; private set; }
 
-        public HashSet<IContextMessageSource> ContextMessageSources
+        public ISet<IContextMessageSource> ContextMessageSources
         {
             get { return _contextMessageSources; }
         }
@@ -57,12 +72,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         public void Start()
         {
             IContextStart contextStart = new ContextStartImpl(Id);
-            
-            ////TODO: enable
-            ////foreach (IObserver<IContextStart> startHandler in 
_contextStartHandlers)
-            ////{
-            ////   startHandler.OnNext(contextStart);
-            ////}
+
+            foreach (var startHandler in _contextStartHandlers)
+            {
+                startHandler.OnNext(contextStart);
+            }
         }
 
         /// <summary>
@@ -70,25 +84,19 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
         /// </summary>
         public void Close()
         {
-            ////IContextStop contextStop = new ContextStopImpl(Id);
-            ////foreach (IObserver<IContextStop> startHandler in 
_contextStopHandlers)
-            ////{
-            ////   startHandler.OnNext(contextStop);
-            ////}
+            IContextStop contextStop = new ContextStopImpl(Id);
+            foreach (var stopHandler in _contextStopHandlers)
+            {
+                stopHandler.OnNext(contextStop);
+            }
         }
 
         public void HandleContextMessage(byte[] message)
         {
-            // contextMessageHandler.onNext(message);
-        }
-
-        /// <summary>
-        /// get the set of ContextMessageSources configured
-        /// </summary>
-        /// <returns>(a shallow copy of) the set of ContextMessageSources 
configured.</returns>
-        public HashSet<IContextMessageSource> GetContextMessageSources()
-        {
-            return new HashSet<IContextMessageSource>(_contextMessageSources);
+            foreach (var contextMessageHandler in _contextMessageHandlers)
+            {
+                contextMessageHandler.OnNext(message);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs 
b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
index 953e1cd..7a78904 100644
--- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
+++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs
@@ -28,7 +28,7 @@ using Org.Apache.REEF.Utilities.Logging;
 
 namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context
 {
-    internal sealed class ContextRuntime
+    internal sealed class ContextRuntime : IDisposable
     {
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(ContextRuntime));
 

http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs 
b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs
new file mode 100644
index 0000000..a462940
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Text;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Events;
+using Org.Apache.REEF.Common.Runtime.Evaluator.Context;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities;
+using Xunit;
+using ContextConfiguration = 
Org.Apache.REEF.Common.Context.ContextConfiguration;
+
+namespace Org.Apache.REEF.Evaluator.Tests
+{
+    public sealed class ContextRuntimeTests
+    {
+        [Fact]
+        [Trait("Priority", "0")]
+        [Trait("Category", "Unit")]
+        public void TestContextEvents()
+        {
+            const string hello = "Hello!";
+            var contextConfig = ContextConfiguration.ConfigurationModule
+                .Set(ContextConfiguration.Identifier, "ID")
+                .Set(ContextConfiguration.OnContextStart, 
GenericType<ContextEventHandler>.Class)
+                .Set(ContextConfiguration.OnContextStop, 
GenericType<ContextEventHandler>.Class)
+                .Set(ContextConfiguration.OnMessage, 
GenericType<ContextEventHandler>.Class)
+                .Build();
+            
+            var injector = TangFactory.GetTang().NewInjector();
+
+            var handler = new ContextEventHandler();
+            
injector.BindVolatileInstance(GenericType<ContextEventHandler>.Class, handler);
+
+            using (var contextRuntime = new ContextRuntime(injector, 
contextConfig,
+                    Optional<ContextRuntime>.Empty()))
+            {
+                
contextRuntime.HandleContextMessage(Encoding.UTF8.GetBytes(hello));
+            }
+
+            Assert.True(handler.Started, "Handler did not receive the start 
signal.");
+            Assert.True(handler.Stopped, "Handler did not receive the stop 
signal.");
+            Assert.Equal(Encoding.UTF8.GetString(handler.MessageReceived), 
hello);
+        }
+
+        private sealed class ContextEventHandler 
+            : IObserver<IContextStart>, IObserver<IContextStop>, 
IContextMessageHandler
+        {
+            [Inject]
+            public ContextEventHandler()
+            {
+            }
+
+            public bool Started { get; private set; }
+
+            public bool Stopped { get; private set; }
+
+            public byte[] MessageReceived { get; private set; }
+
+            public void OnNext(IContextStart value)
+            {
+                Started = true;
+            }
+
+            public void OnNext(IContextStop value)
+            {
+                Stopped = true;
+            }
+
+            public void OnNext(byte[] value)
+            {
+                MessageReceived = value;
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj
 
b/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj
index 0118ac3..8046a27 100644
--- 
a/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj
+++ 
b/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj
@@ -60,6 +60,7 @@ under the License.
     </Reference>
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="ContextRuntimeTests.cs" />
     <Compile Include="EvaluatorConfigurationsTests.cs" />
     <Compile Include="EvaluatorTests.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
@@ -119,4 +120,4 @@ under the License.
     <Error 
Condition="!Exists('$(SolutionDir)\packages\xunit.core.2.1.0\build\portable-net45+win8+wp8+wpa81\xunit.core.props')"
 Text="$([System.String]::Format('$(ErrorText)', 
'$(PackagesDir)\xunit.core.2.1.0\build\portable-net45+win8+wp8+wpa81\xunit.core.props'))"
 />
     <Error 
Condition="!Exists('$(SolutionDir)\packages\xunit.runner.visualstudio.2.1.0\build\net20\xunit.runner.visualstudio.props')"
 Text="$([System.String]::Format('$(ErrorText)', 
'$(PackagesDir)\xunit.runner.visualstudio.2.1.0\build\net20\xunit.runner.visualstudio.props'))"
 />
   </Target>
-</Project>
+</Project>
\ No newline at end of file

Reply via email to