Repository: reef
Updated Branches:
  refs/heads/master 02ddd4a41 -> 72106b97c


[REEF-1381] Allow to add Observer for ActiveContextManager

* Allow to add an observer for ActiveContextManager
* Notify the observer when all contexts are received
* Add test cases for it

JIRA:
  [REEF-1381](https://issues.apache.org/jira/browse/REEF-1381)

Pull request:
  This closes #990


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/72106b97
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/72106b97
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/72106b97

Branch: refs/heads/master
Commit: 72106b97ce20a08907af053ac2c3698f199dcea9
Parents: 02ddd4a
Author: Julia Wang <[email protected]>
Authored: Thu May 5 14:19:56 2016 -0700
Committer: Mariia Mykhailova <[email protected]>
Committed: Wed May 11 21:24:40 2016 -0700

----------------------------------------------------------------------
 .../TestActiveContextManager.cs                 | 56 +++++++++++++++++---
 .../OnREEF/Driver/ActiveContextManager.cs       | 47 +++++++++++++---
 2 files changed, 89 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/72106b97/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs
index f2c01ad..d3d09d6 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestActiveContextManager.cs
@@ -34,19 +34,23 @@ namespace Org.Apache.REEF.IMRU.Tests
         private const string ContextIdPrefix = "ContextId";
 
         /// <summary>
-        /// Create a ActiveContextManager and add some IActiveContexts to it.
+        /// Create a ActiveContextManager and add total number of 
IActiveContexts to it.
+        /// Expect to receive notification in the observer
         /// </summary>
         /// <returns></returns>
-        private ActiveContextManager InitializeActiveContextManager()
+        private static ActiveContextManager InitializeActiveContextManager()
         {
             const int totalEvaluators = 5;
             var activeContextManager = new 
ActiveContextManager(totalEvaluators);
+            var contextObserver = new TestContextObserver(totalEvaluators);
+            activeContextManager.Subscribe(contextObserver);
+
             for (int i = 0; i < totalEvaluators; i++)
             {
                 activeContextManager.Add(CreateMockActiveContext(i));
             }
-            Assert.True(activeContextManager.AreAllContextsReceived);
             Assert.Equal(totalEvaluators, 
activeContextManager.NumberOfActiveContexts);
+            Assert.Equal(totalEvaluators, 
contextObserver.NumberOfActiveContextsReceived());
 
             return activeContextManager;
         }
@@ -58,12 +62,13 @@ namespace Org.Apache.REEF.IMRU.Tests
         [Fact]
         public void TestValidAddRemoveCases()
         {
+            const int totalEvaluators = 3;
             var activeContextManager = InitializeActiveContextManager();
-            
activeContextManager.RemoveFailedContextInFailedEvaluator(CreateMockFailedEvaluator(new
 List<int> { 3 }));
+            
activeContextManager.RemoveFailedContextInFailedEvaluator(CreateMockFailedEvaluator(new
 List<int> { totalEvaluators }));
             Assert.Equal(1, activeContextManager.NumberOfMissingContexts);
 
             activeContextManager.Remove(ContextIdPrefix + 4);
-            Assert.Equal(3, activeContextManager.NumberOfActiveContexts);
+            Assert.Equal(totalEvaluators, 
activeContextManager.NumberOfActiveContexts);
         }
 
         /// <summary>
@@ -71,7 +76,8 @@ namespace Org.Apache.REEF.IMRU.Tests
         /// </summary>
         public void TestInvalidAddRemoveCases()
         {
-            var activeContextManager = new ActiveContextManager(3);
+            const int totalEvaluators = 3;
+            var activeContextManager = new 
ActiveContextManager(totalEvaluators);
             activeContextManager.Add(CreateMockActiveContext(1));
 
             Action add = () => 
activeContextManager.Add(CreateMockActiveContext(1));
@@ -171,5 +177,43 @@ namespace Org.Apache.REEF.IMRU.Tests
             mockFailedEvalutor.FailedContexts.Returns(failedContexts);
             return mockFailedEvalutor;
         }
+
+        /// <summary>
+        /// A Context Manager observer for test
+        /// </summary>
+        private sealed class TestContextObserver : 
IObserver<IDictionary<string, IActiveContext>>
+        {
+            private readonly int _totalExpected;
+            private IDictionary<string, IActiveContext> _contexts = null;
+
+            internal TestContextObserver(int totalExpected)
+            {
+                _totalExpected = totalExpected;
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public int NumberOfActiveContextsReceived()
+            {
+                if (_contexts != null)
+                {
+                    return _contexts.Count;                    
+                }
+                return 0;
+            }
+
+            public void OnNext(IDictionary<string, IActiveContext> value)
+            {
+                _contexts = value;
+            }
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/72106b97/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
index 36c54ce..219a9f6 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ActiveContextManager.cs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
 using System.Collections.Generic;
 using System.Collections.ObjectModel;
 using System.Globalization;
@@ -31,15 +32,17 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
     /// Manages active contexts for the driver
     /// </summary>
     [NotThreadSafe]
-    internal sealed class ActiveContextManager
+    internal sealed class ActiveContextManager : IDisposable
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(ActiveContextManager));
         private readonly IDictionary<string, IActiveContext> _activeContexts = 
new Dictionary<string, IActiveContext>();
         private readonly int _totalExpectedContexts;
+        private IObserver<IDictionary<string, IActiveContext>> 
_activeContextObserver;
 
         /// <summary>
         /// Constructor of ActiveContextManager
         /// totalContexts specify the total number of expected active contexts 
that driver needs
+        /// activeContextObserver will be notified when all active contexts 
are received. 
         /// </summary>
         /// <param name="totalContexts"></param>
         internal ActiveContextManager(int totalContexts)
@@ -64,16 +67,31 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
+        /// Subscribe an observer of ActiveContextManager
+        /// </summary>
+        /// <param name="activeContextObserver"></param>
+        /// <returns></returns>
+        public IDisposable Subscribe(IObserver<IDictionary<string, 
IActiveContext>> activeContextObserver)
+        {
+            if (_activeContextObserver != null)
+            {
+                return null;
+            }
+            _activeContextObserver = activeContextObserver;
+            return this;
+        }
+
+        /// <summary>
         /// Checks if all the requested contexts are received. 
         /// </summary>
-        internal bool AreAllContextsReceived
+        private bool AreAllContextsReceived
         {
             get { return _totalExpectedContexts == NumberOfActiveContexts; }
         }
 
         /// <summary>
-        /// Adding an IActiveContext to the ActiveContext collection
-        /// Throw IMRUSystemException if the IActiveContext already exists or 
NumberOfActiveContexts has exceeded the total expected contexts
+        /// Adds an IActiveContext to the ActiveContext collection
+        /// Throws IMRUSystemException if the IActiveContext already exists or 
NumberOfActiveContexts has exceeded the total expected contexts
         /// </summary>
         /// <param name="activeContext"></param>
         internal void Add(IActiveContext activeContext)
@@ -91,11 +109,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
             }
 
             _activeContexts.Add(activeContext.Id, activeContext);
+
+            if (AreAllContextsReceived && _activeContextObserver != null)
+            {
+                _activeContextObserver.OnNext(_activeContexts);
+            }
         }
 
         /// <summary>
-        /// Remove an IActiveContext from the ActiveContext collection
-        /// Throw IMRUSystemException if the IActiveContext doesn't exist.
+        /// Removes an IActiveContext from the ActiveContext collection
+        /// Throws IMRUSystemException if the IActiveContext doesn't exist.
         /// </summary>
         /// <param name="activeContextId"></param>
         internal void Remove(string activeContextId)
@@ -117,8 +140,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
-        /// Given an IFailedEvaluator, remove associated IActiveContext from 
the collection
-        /// Throw IMRUSystemException if associated IActiveContext doesn't 
exist or
+        /// Given an IFailedEvaluator, removes associated IActiveContext from 
the collection
+        /// Throws IMRUSystemException if associated IActiveContext doesn't 
exist or
         /// if more than one IActiveContexts are associated with the 
IFailedEvaluator
         /// as current IMRU driver assumes that there is only one context 
associated with the IFailedEvalutor
         /// </summary>
@@ -148,5 +171,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                 }
             }
         }
+
+        /// <summary>
+        /// sets _activeContextObserver to null 
+        /// </summary>
+        public void Dispose()
+        {
+            _activeContextObserver = null;
+        }
     }
 }
\ No newline at end of file

Reply via email to