Repository: reef Updated Branches: refs/heads/master 02ddd4a41 -> 72106b97c
[REEF-1381] Allow to add Observer for ActiveContextManager * Allow to add an observer for ActiveContextManager * Notify the observer when all contexts are received * Add test cases for it JIRA: [REEF-1381](https://issues.apache.org/jira/browse/REEF-1381) Pull request: This closes #990 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/72106b97 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/72106b97 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/72106b97 Branch: refs/heads/master Commit: 72106b97ce20a08907af053ac2c3698f199dcea9 Parents: 02ddd4a Author: Julia Wang <[email protected]> Authored: Thu May 5 14:19:56 2016 -0700 Committer: Mariia Mykhailova <[email protected]> Committed: Wed May 11 21:24:40 2016 -0700 ---------------------------------------------------------------------- .../TestActiveContextManager.cs | 56 +++++++++++++++++--- .../OnREEF/Driver/ActiveContextManager.cs | 47 +++++++++++++--- 2 files changed, 89 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/72106b97/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs index f2c01ad..d3d09d6 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs @@ -34,19 +34,23 @@ namespace Org.Apache.REEF.IMRU.Tests private const string ContextIdPrefix = "ContextId"; /// <summary> - /// Create a ActiveContextManager and add some IActiveContexts to it. + /// Create a ActiveContextManager and add total number of IActiveContexts to it. + /// Expect to receive notification in the observer /// </summary> /// <returns></returns> - private ActiveContextManager InitializeActiveContextManager() + private static ActiveContextManager InitializeActiveContextManager() { const int totalEvaluators = 5; var activeContextManager = new ActiveContextManager(totalEvaluators); + var contextObserver = new TestContextObserver(totalEvaluators); + activeContextManager.Subscribe(contextObserver); + for (int i = 0; i < totalEvaluators; i++) { activeContextManager.Add(CreateMockActiveContext(i)); } - Assert.True(activeContextManager.AreAllContextsReceived); Assert.Equal(totalEvaluators, activeContextManager.NumberOfActiveContexts); + Assert.Equal(totalEvaluators, contextObserver.NumberOfActiveContextsReceived()); return activeContextManager; } @@ -58,12 +62,13 @@ namespace Org.Apache.REEF.IMRU.Tests [Fact] public void TestValidAddRemoveCases() { + const int totalEvaluators = 3; var activeContextManager = InitializeActiveContextManager(); - activeContextManager.RemoveFailedContextInFailedEvaluator(CreateMockFailedEvaluator(new List<int> { 3 })); + activeContextManager.RemoveFailedContextInFailedEvaluator(CreateMockFailedEvaluator(new List<int> { totalEvaluators })); Assert.Equal(1, activeContextManager.NumberOfMissingContexts); activeContextManager.Remove(ContextIdPrefix + 4); - Assert.Equal(3, activeContextManager.NumberOfActiveContexts); + Assert.Equal(totalEvaluators, activeContextManager.NumberOfActiveContexts); } /// <summary> @@ -71,7 +76,8 @@ namespace Org.Apache.REEF.IMRU.Tests /// </summary> public void TestInvalidAddRemoveCases() { - var activeContextManager = new ActiveContextManager(3); + const int totalEvaluators = 3; + var activeContextManager = new ActiveContextManager(totalEvaluators); activeContextManager.Add(CreateMockActiveContext(1)); Action add = () => activeContextManager.Add(CreateMockActiveContext(1)); @@ -171,5 +177,43 @@ namespace Org.Apache.REEF.IMRU.Tests mockFailedEvalutor.FailedContexts.Returns(failedContexts); return mockFailedEvalutor; } + + /// <summary> + /// A Context Manager observer for test + /// </summary> + private sealed class TestContextObserver : IObserver<IDictionary<string, IActiveContext>> + { + private readonly int _totalExpected; + private IDictionary<string, IActiveContext> _contexts = null; + + internal TestContextObserver(int totalExpected) + { + _totalExpected = totalExpected; + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public int NumberOfActiveContextsReceived() + { + if (_contexts != null) + { + return _contexts.Count; + } + return 0; + } + + public void OnNext(IDictionary<string, IActiveContext> value) + { + _contexts = value; + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/72106b97/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs index 36c54ce..219a9f6 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +using System; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Globalization; @@ -31,15 +32,17 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// Manages active contexts for the driver /// </summary> [NotThreadSafe] - internal sealed class ActiveContextManager + internal sealed class ActiveContextManager : IDisposable { private static readonly Logger Logger = Logger.GetLogger(typeof(ActiveContextManager)); private readonly IDictionary<string, IActiveContext> _activeContexts = new Dictionary<string, IActiveContext>(); private readonly int _totalExpectedContexts; + private IObserver<IDictionary<string, IActiveContext>> _activeContextObserver; /// <summary> /// Constructor of ActiveContextManager /// totalContexts specify the total number of expected active contexts that driver needs + /// activeContextObserver will be notified when all active contexts are received. /// </summary> /// <param name="totalContexts"></param> internal ActiveContextManager(int totalContexts) @@ -64,16 +67,31 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> + /// Subscribe an observer of ActiveContextManager + /// </summary> + /// <param name="activeContextObserver"></param> + /// <returns></returns> + public IDisposable Subscribe(IObserver<IDictionary<string, IActiveContext>> activeContextObserver) + { + if (_activeContextObserver != null) + { + return null; + } + _activeContextObserver = activeContextObserver; + return this; + } + + /// <summary> /// Checks if all the requested contexts are received. /// </summary> - internal bool AreAllContextsReceived + private bool AreAllContextsReceived { get { return _totalExpectedContexts == NumberOfActiveContexts; } } /// <summary> - /// Adding an IActiveContext to the ActiveContext collection - /// Throw IMRUSystemException if the IActiveContext already exists or NumberOfActiveContexts has exceeded the total expected contexts + /// Adds an IActiveContext to the ActiveContext collection + /// Throws IMRUSystemException if the IActiveContext already exists or NumberOfActiveContexts has exceeded the total expected contexts /// </summary> /// <param name="activeContext"></param> internal void Add(IActiveContext activeContext) @@ -91,11 +109,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } _activeContexts.Add(activeContext.Id, activeContext); + + if (AreAllContextsReceived && _activeContextObserver != null) + { + _activeContextObserver.OnNext(_activeContexts); + } } /// <summary> - /// Remove an IActiveContext from the ActiveContext collection - /// Throw IMRUSystemException if the IActiveContext doesn't exist. + /// Removes an IActiveContext from the ActiveContext collection + /// Throws IMRUSystemException if the IActiveContext doesn't exist. /// </summary> /// <param name="activeContextId"></param> internal void Remove(string activeContextId) @@ -117,8 +140,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> - /// Given an IFailedEvaluator, remove associated IActiveContext from the collection - /// Throw IMRUSystemException if associated IActiveContext doesn't exist or + /// Given an IFailedEvaluator, removes associated IActiveContext from the collection + /// Throws IMRUSystemException if associated IActiveContext doesn't exist or /// if more than one IActiveContexts are associated with the IFailedEvaluator /// as current IMRU driver assumes that there is only one context associated with the IFailedEvalutor /// </summary> @@ -148,5 +171,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } } } + + /// <summary> + /// sets _activeContextObserver to null + /// </summary> + public void Dispose() + { + _activeContextObserver = null; + } } } \ No newline at end of file
