Repository: reef Updated Branches: refs/heads/master ec7bc969f -> dd724ffdc
[REEF-1146] Support Context Events This addressed the issue by * Support ContextStart and ContextStop. * Set up initial structure to support ContextMessages and messages from Driver to Context. JIRA: [REEF-1146](https://issues.apache.org/jira/browse/REEF-1146) Pull Request: This closes #816 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/dd724ffd Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/dd724ffd Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/dd724ffd Branch: refs/heads/master Commit: dd724ffdcfc93c911392f29e848674cfeef5a7f3 Parents: ec7bc96 Author: Andrew Chung <[email protected]> Authored: Mon Feb 1 15:07:56 2016 -0800 Committer: Markus Weimer <[email protected]> Committed: Wed Feb 3 21:11:45 2016 -0800 ---------------------------------------------------------------------- .../Defaults/DefaultContextMessageSource.cs | 6 ++ .../Defaults/DefaultContextStartHandler.cs | 6 ++ .../Defaults/DefaultContextStopHandler.cs | 6 ++ .../Evaluator/Context/ContextLifeCycle.cs | 64 +++++++----- .../Runtime/Evaluator/Context/ContextRuntime.cs | 2 +- .../ContextRuntimeTests.cs | 103 +++++++++++++++++++ .../Org.Apache.REEF.Evaluator.Tests.csproj | 3 +- 7 files changed, 160 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextMessageSource.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextMessageSource.cs b/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextMessageSource.cs index 67c5ed0..ca701ea 100644 --- a/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextMessageSource.cs +++ b/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextMessageSource.cs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities; namespace Org.Apache.REEF.Common.Context.Defaults @@ -24,6 +25,11 @@ namespace Org.Apache.REEF.Common.Context.Defaults /// </summary> public class DefaultContextMessageSource : IContextMessageSource { + [Inject] + private DefaultContextMessageSource() + { + } + public Optional<ContextMessage> Message { get http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStartHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStartHandler.cs b/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStartHandler.cs index 01422d1..5db4bac 100644 --- a/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStartHandler.cs +++ b/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStartHandler.cs @@ -17,6 +17,7 @@ using System; using Org.Apache.REEF.Common.Events; +using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Common.Context.Defaults @@ -28,6 +29,11 @@ namespace Org.Apache.REEF.Common.Context.Defaults { private static readonly Logger Logger = Logger.GetLogger(typeof(DefaultContextStartHandler)); + [Inject] + private DefaultContextStartHandler() + { + } + public void OnNext(IContextStart contextStart) { Logger.Log(Level.Info, "DefaultContextStartHandler received for context: " + contextStart.Id); http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStopHandler.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStopHandler.cs b/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStopHandler.cs index 8688314..c609eff 100644 --- a/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStopHandler.cs +++ b/lang/cs/Org.Apache.REEF.Common/Context/Defaults/DefaultContextStopHandler.cs @@ -17,6 +17,7 @@ using System; using Org.Apache.REEF.Common.Events; +using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Common.Context.Defaults @@ -28,6 +29,11 @@ namespace Org.Apache.REEF.Common.Context.Defaults { private static readonly Logger Logger = Logger.GetLogger(typeof(DefaultContextStopHandler)); + [Inject] + private DefaultContextStopHandler() + { + } + public void OnNext(IContextStop contextStop) { Logger.Log(Level.Info, "DefaultContextStopHandler received for context: " + contextStop.Id); http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextLifeCycle.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextLifeCycle.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextLifeCycle.cs index a987d28..46def9b 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextLifeCycle.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextLifeCycle.cs @@ -28,13 +28,12 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// </summary> internal sealed class ContextLifeCycle { - private HashSet<IObserver<IContextStart>> _contextStartHandlers; + private readonly ISet<IObserver<IContextStart>> _contextStartHandlers; + private readonly ISet<IObserver<IContextStop>> _contextStopHandlers; + private readonly ISet<IContextMessageSource> _contextMessageSources; + private readonly ISet<IContextMessageHandler> _contextMessageHandlers; - private HashSet<IObserver<IContextStop>> _contextStopHandlers; - - private readonly HashSet<IContextMessageSource> _contextMessageSources; - - // TODO[JIRA REEF-1167]: Make method private. + // TODO[JIRA REEF-1167]: Remove constructor.. [Inject] public ContextLifeCycle([Parameter(typeof(ContextConfigurationOptions.ContextIdentifier))] string contextId) { @@ -42,11 +41,27 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context _contextStartHandlers = new HashSet<IObserver<IContextStart>>(); _contextStopHandlers = new HashSet<IObserver<IContextStop>>(); _contextMessageSources = new HashSet<IContextMessageSource>(); + _contextMessageHandlers = new HashSet<IContextMessageHandler>(); + } + + [Inject] + private ContextLifeCycle( + [Parameter(typeof(ContextConfigurationOptions.ContextIdentifier))] string contextId, + [Parameter(typeof(ContextConfigurationOptions.StartHandlers))] ISet<IObserver<IContextStart>> contextStartHandlers, + [Parameter(typeof(ContextConfigurationOptions.StopHandlers))] ISet<IObserver<IContextStop>> contextStopHandlers, + [Parameter(typeof(ContextConfigurationOptions.ContextMessageSources))] ISet<IContextMessageSource> contextMessageSources, + [Parameter(typeof(ContextConfigurationOptions.ContextMessageHandlers))] ISet<IContextMessageHandler> contextMessageHandlers) + { + Id = contextId; + _contextStartHandlers = contextStartHandlers; + _contextStopHandlers = contextStopHandlers; + _contextMessageSources = contextMessageSources; + _contextMessageHandlers = contextMessageHandlers; } public string Id { get; private set; } - public HashSet<IContextMessageSource> ContextMessageSources + public ISet<IContextMessageSource> ContextMessageSources { get { return _contextMessageSources; } } @@ -57,12 +72,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context public void Start() { IContextStart contextStart = new ContextStartImpl(Id); - - ////TODO: enable - ////foreach (IObserver<IContextStart> startHandler in _contextStartHandlers) - ////{ - //// startHandler.OnNext(contextStart); - ////} + + foreach (var startHandler in _contextStartHandlers) + { + startHandler.OnNext(contextStart); + } } /// <summary> @@ -70,25 +84,19 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context /// </summary> public void Close() { - ////IContextStop contextStop = new ContextStopImpl(Id); - ////foreach (IObserver<IContextStop> startHandler in _contextStopHandlers) - ////{ - //// startHandler.OnNext(contextStop); - ////} + IContextStop contextStop = new ContextStopImpl(Id); + foreach (var stopHandler in _contextStopHandlers) + { + stopHandler.OnNext(contextStop); + } } public void HandleContextMessage(byte[] message) { - // contextMessageHandler.onNext(message); - } - - /// <summary> - /// get the set of ContextMessageSources configured - /// </summary> - /// <returns>(a shallow copy of) the set of ContextMessageSources configured.</returns> - public HashSet<IContextMessageSource> GetContextMessageSources() - { - return new HashSet<IContextMessageSource>(_contextMessageSources); + foreach (var contextMessageHandler in _contextMessageHandlers) + { + contextMessageHandler.OnNext(message); + } } } } http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs index 953e1cd..7a78904 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Context/ContextRuntime.cs @@ -28,7 +28,7 @@ using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Common.Runtime.Evaluator.Context { - internal sealed class ContextRuntime + internal sealed class ContextRuntime : IDisposable { private static readonly Logger LOGGER = Logger.GetLogger(typeof(ContextRuntime)); http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs new file mode 100644 index 0000000..a462940 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/ContextRuntimeTests.cs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Text; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Events; +using Org.Apache.REEF.Common.Runtime.Evaluator.Context; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities; +using Xunit; +using ContextConfiguration = Org.Apache.REEF.Common.Context.ContextConfiguration; + +namespace Org.Apache.REEF.Evaluator.Tests +{ + public sealed class ContextRuntimeTests + { + [Fact] + [Trait("Priority", "0")] + [Trait("Category", "Unit")] + public void TestContextEvents() + { + const string hello = "Hello!"; + var contextConfig = ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, "ID") + .Set(ContextConfiguration.OnContextStart, GenericType<ContextEventHandler>.Class) + .Set(ContextConfiguration.OnContextStop, GenericType<ContextEventHandler>.Class) + .Set(ContextConfiguration.OnMessage, GenericType<ContextEventHandler>.Class) + .Build(); + + var injector = TangFactory.GetTang().NewInjector(); + + var handler = new ContextEventHandler(); + injector.BindVolatileInstance(GenericType<ContextEventHandler>.Class, handler); + + using (var contextRuntime = new ContextRuntime(injector, contextConfig, + Optional<ContextRuntime>.Empty())) + { + contextRuntime.HandleContextMessage(Encoding.UTF8.GetBytes(hello)); + } + + Assert.True(handler.Started, "Handler did not receive the start signal."); + Assert.True(handler.Stopped, "Handler did not receive the stop signal."); + Assert.Equal(Encoding.UTF8.GetString(handler.MessageReceived), hello); + } + + private sealed class ContextEventHandler + : IObserver<IContextStart>, IObserver<IContextStop>, IContextMessageHandler + { + [Inject] + public ContextEventHandler() + { + } + + public bool Started { get; private set; } + + public bool Stopped { get; private set; } + + public byte[] MessageReceived { get; private set; } + + public void OnNext(IContextStart value) + { + Started = true; + } + + public void OnNext(IContextStop value) + { + Stopped = true; + } + + public void OnNext(byte[] value) + { + MessageReceived = value; + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/dd724ffd/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj b/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj index 0118ac3..8046a27 100644 --- a/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Evaluator.Tests/Org.Apache.REEF.Evaluator.Tests.csproj @@ -60,6 +60,7 @@ under the License. </Reference> </ItemGroup> <ItemGroup> + <Compile Include="ContextRuntimeTests.cs" /> <Compile Include="EvaluatorConfigurationsTests.cs" /> <Compile Include="EvaluatorTests.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> @@ -119,4 +120,4 @@ under the License. <Error Condition="!Exists('$(SolutionDir)\packages\xunit.core.2.1.0\build\portable-net45+win8+wp8+wpa81\xunit.core.props')" Text="$([System.String]::Format('$(ErrorText)', '$(PackagesDir)\xunit.core.2.1.0\build\portable-net45+win8+wp8+wpa81\xunit.core.props'))" /> <Error Condition="!Exists('$(SolutionDir)\packages\xunit.runner.visualstudio.2.1.0\build\net20\xunit.runner.visualstudio.props')" Text="$([System.String]::Format('$(ErrorText)', '$(PackagesDir)\xunit.runner.visualstudio.2.1.0\build\net20\xunit.runner.visualstudio.props'))" /> </Target> -</Project> +</Project> \ No newline at end of file
