Repository: reef
Updated Branches:
  refs/heads/master fda3ee620 -> 5a8b518e3


[REEF-1378] Creating Evaluator Manager for Fault Tolerant

Adding Evaluator Manager which manages allocated evaluators and failed 
evaluators mainly for book keeping, tracking failed evaluators and master 
evaluator.
Adding test cases for it

JIRA: [REEF-1378](https://issues.apache.org/jira/browse/REEF-1378)
This closes #987


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/5a8b518e
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/5a8b518e
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/5a8b518e

Branch: refs/heads/master
Commit: 5a8b518e362743e087d5a20627bba12977578ab4
Parents: fda3ee6
Author: Julia Wang <[email protected]>
Authored: Wed May 4 10:23:56 2016 -0700
Committer: Andrew Chung <[email protected]>
Committed: Mon May 9 09:43:03 2016 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.IMRU.Tests.csproj           |   1 +
 .../TestEvaluatorManager.cs                     | 191 +++++++++++
 .../OnREEF/Driver/EvaluatorManager.cs           | 334 +++++++++++++++++++
 .../OnREEF/Driver/EvaluatorSpecification.cs     |  46 +++
 .../Org.Apache.REEF.IMRU.csproj                 |   2 +
 .../FaultTolerant/TestContextStart.cs           |   2 +-
 6 files changed, 575 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/5a8b518e/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj 
b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
index 5c50039..415a6db 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/Org.Apache.REEF.IMRU.Tests.csproj
@@ -52,6 +52,7 @@ under the License.
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="MapperCountTest.cs" />
     <Compile Include="TestActiveContextManager.cs" />
+    <Compile Include="TestEvaluatorManager.cs" />
     <Compile Include="TestSystemStates.cs" />
     <Compile Include="TestTaskStates.cs" />
   </ItemGroup>

http://git-wip-us.apache.org/repos/asf/reef/blob/5a8b518e/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
new file mode 100644
index 0000000..489c725
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using NSubstitute;
+using Xunit;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+
+namespace Org.Apache.REEF.IMRU.Tests
+{
+    /// <summary>
+    /// Test cases for EvaluatorManager
+    /// </summary>
+    public sealed class TestEvaluatorManager
+    {
+        private const string EvaluatorIdPrefix = "EvaluatorId";
+        private int _masterBatchIdSquenceNumber = 0;
+        private int _mapperBatchIdSquenceNumber = 0;
+
+        /// <summary>
+        /// Test valid add, remove Evaluators
+        /// </summary>
+        [Fact]
+        public void TestValidAddRemoveAllocatedEvaluator()
+        {
+            var evalutorManager = CreateTestEvaluators(3, 1);
+            Assert.Equal(3, evalutorManager.NumberOfAllocatedEvaluators);
+            Assert.True(evalutorManager.AreAllEvaluatorsAllocated());
+            Assert.True(evalutorManager.IsMasterEvaluatorId(EvaluatorIdPrefix 
+ 1));
+            Assert.False(evalutorManager.IsMasterEvaluatorId(EvaluatorIdPrefix 
+ 2));
+            Assert.True(evalutorManager.IsAllocatedEvaluator(EvaluatorIdPrefix 
+ 2));
+            Assert.False(evalutorManager.IsMasterEvaluatorFailed());
+
+            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
+            Assert.Equal(2, evalutorManager.NumberOfAllocatedEvaluators);
+            Assert.True(evalutorManager.IsMasterEvaluatorFailed());
+            Assert.Equal(0, evalutorManager.NumberofFailedMappers());
+
+            evalutorManager.ResetFailedEvaluators();
+            
evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, 
EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            Assert.True(evalutorManager.AreAllEvaluatorsAllocated());
+        }
+
+        /// <summary>
+        /// Test case: no master Evaluator is requested
+        /// </summary>
+        [Fact]
+        public void TestNoMasterEvaluator()
+        {
+            var evalutorManager = CreateEvaluatorManager(3, 1);
+            
evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            
evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            Action add = () => 
evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            Assert.Throws<IMRUSystemException>(add);
+        }
+
+        /// <summary>
+        /// Test case: request two master Evaluators
+        /// </summary>
+        [Fact]
+        public void TestTwoMasterEvaluator()
+        {
+            var evalutorManager = CreateEvaluatorManager(3, 1);
+            
evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, 
EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            
evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            Action add = () => 
evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, 
EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            Assert.Throws<IMRUSystemException>(add);
+        }
+
+        /// <summary>
+        /// Test case: number of allocated Evaluators is more than expected 
number
+        /// </summary>
+        [Fact]
+        public void TestTooManyEvaluators()
+        {
+            var evalutorManager = CreateEvaluatorManager(2, 1);
+            
evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, 
EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            
evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            Action add = () => 
evalutorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            Assert.Throws<IMRUSystemException>(add);
+        }
+
+        /// <summary>
+        /// Test case: Remove no exists Evaluator
+        /// </summary>
+        [Fact]
+        public void TestRemoveInvalidEvaluators()
+        {
+            var evaluatorManager = CreateTestEvaluators(3, 1);
+            evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2);
+            Action remove = () => 
evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 4);
+            Assert.Throws<IMRUSystemException>(remove);
+        }
+
+        /// <summary>
+        /// Test reset FailedEvaluators
+        /// </summary>
+        [Fact]
+        public void TestResetFailedEvaluators()
+        {
+            var evalutorManager = CreateTestEvaluators(3, 1);
+            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
+            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2);
+            Assert.Equal(2, evalutorManager.NumberOfMissingEvaluators());
+            evalutorManager.ResetFailedEvaluators();
+            Assert.Equal(0, evalutorManager.NumberofFailedMappers());
+            Assert.False(evalutorManager.IsMasterEvaluatorId(EvaluatorIdPrefix 
+ 1));
+            Assert.False(evalutorManager.IsMasterEvaluatorFailed());
+        }
+
+        /// <summary>
+        /// Test case: Maximum number of Evaluator failures has been reached
+        /// </summary>
+        [Fact]
+        public void TestReachedMaximumNumberOfEvaluatorFailures()
+        {
+            var evalutorManager = CreateTestEvaluators(3, 2);
+            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
+            evalutorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2);
+            
Assert.True(evalutorManager.ReachedMaximumNumberOfEvaluatorFailures());
+        }
+
+        /// <summary>
+        /// Create an EvaluatorManager and add mock Evaluators for testing
+        /// </summary>
+        /// <param name="totalEvaluators"></param>
+        /// <param name="allowedNumberOfEvaluatorFailures"></param>
+        /// <returns></returns>
+        private EvaluatorManager CreateTestEvaluators(int totalEvaluators, int 
allowedNumberOfEvaluatorFailures)
+        {
+            var evaluatorManager = CreateEvaluatorManager(totalEvaluators, 
allowedNumberOfEvaluatorFailures);
+            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, 
EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            for (var i = 2; i <= totalEvaluators; i++)
+            {
+                
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(i, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            }
+            return evaluatorManager;
+        }
+
+        /// <summary>
+        /// Create a mocked IAllocatedEvaluator
+        /// </summary>
+        /// <param name="id"></param>
+        /// <param name="batchId"></param>
+        /// <returns></returns>
+        private static IAllocatedEvaluator CreateMockAllocatedEvaluator(int 
id, string batchId)
+        {
+            IAllocatedEvaluator mockAllocatedEvaluator = 
Substitute.For<IAllocatedEvaluator>();
+            mockAllocatedEvaluator.EvaluatorBatchId.Returns(batchId);
+            mockAllocatedEvaluator.Id.Returns(EvaluatorIdPrefix + id);
+            return mockAllocatedEvaluator;
+        }
+
+        /// <summary>
+        /// Create an EvaluatorManager for testing
+        /// </summary>
+        /// <param name="totalEvaluators"></param>
+        /// <param name="allowedNumberOfEvaluatorFailures"></param>
+        /// <returns></returns>
+        private EvaluatorManager CreateEvaluatorManager(int totalEvaluators, 
int allowedNumberOfEvaluatorFailures)
+        {
+            var updateSpec = new EvaluatorSpecification(500, 2);
+            var mapperSpec = new EvaluatorSpecification(1000, 3);
+            return new EvaluatorManager(totalEvaluators, 
allowedNumberOfEvaluatorFailures, CreateMockEvaluatorRequestor(), updateSpec, 
mapperSpec);
+        }
+
+        /// <summary>
+        /// Create a mock IEvaluatorRequestor
+        /// </summary>
+        /// <returns></returns>
+        private IEvaluatorRequestor CreateMockEvaluatorRequestor()
+        {
+            return Substitute.For<IEvaluatorRequestor>();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/5a8b518e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
new file mode 100644
index 0000000..8fa9876
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
@@ -0,0 +1,334 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on 
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using System.Globalization;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Utilities.Attributes;
+using Org.Apache.REEF.Utilities.Diagnostics;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    /// <summary>
+    /// Manages allocated and failed Evaluators for driver.
+    /// It keeps tracking allocated Evaluators and failed Evaluator ids. 
Provides methods to 
+    /// add, remove evaluators from the collections so that to provide data 
for fault tolerant.
+    /// It also tracks master evaluator. 
+    /// </summary>
+    [NotThreadSafe]
+    internal sealed class EvaluatorManager
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(EvaluatorManager));
+        internal const string MasterBatchId = "MasterBatchId";
+        internal const string MapperBatchId = "MapperBatchId";
+        private int _masterBatchIdSquenceNumber = 0;
+        private int _mapperBatchIdSquenceNumber = 0;
+
+        private readonly ISet<string> _allocatedEvaluatorIds = new 
HashSet<string>();
+        private readonly ISet<string> _failedEvaluatorIds = new 
HashSet<string>();
+
+        private readonly int _totalExpectedEvaluators;
+        private readonly int _allowedNumberOfEvaluatorFailures;
+        private readonly IEvaluatorRequestor _evaluatorRequestor;
+        private string _masterEvaluatorId;
+
+        private readonly EvaluatorSpecification _updateEvaluatorSpecification;
+        private readonly EvaluatorSpecification _mapperEvaluatorSpecification;
+
+        /// <summary>
+        /// Creates a EvaluatorManager for driver which contains specification 
for the Evaluators
+        /// </summary>
+        /// <param name="totalEvaluators"></param>
+        /// <param name="allowedNumberOfEvaluatorFailures"></param>
+        /// <param name="evaluatorRequestor"></param>
+        /// <param name="updateEvaluatorSpecification"></param>
+        /// <param name="mapperEvaluatorSpecification"></param>
+        internal EvaluatorManager(
+            int totalEvaluators,
+            int allowedNumberOfEvaluatorFailures,
+            IEvaluatorRequestor evaluatorRequestor,
+            EvaluatorSpecification updateEvaluatorSpecification,
+            EvaluatorSpecification mapperEvaluatorSpecification)
+        {
+            _totalExpectedEvaluators = totalEvaluators;
+            _allowedNumberOfEvaluatorFailures = 
allowedNumberOfEvaluatorFailures;
+            _evaluatorRequestor = evaluatorRequestor;
+            _updateEvaluatorSpecification = updateEvaluatorSpecification;
+            _mapperEvaluatorSpecification = mapperEvaluatorSpecification;
+        }
+
+        /// <summary>
+        /// Request update/master Evaluator from resource manager
+        /// </summary>
+        internal void RequestUpdateEvaluator()
+        {
+            _evaluatorRequestor.Submit(
+                _evaluatorRequestor.NewBuilder()
+                    .SetCores(_updateEvaluatorSpecification.Core)
+                    .SetMegabytes(_updateEvaluatorSpecification.Megabytes)
+                    .SetNumber(1)
+                    .SetEvaluatorBatchId(MasterBatchId + 
_masterBatchIdSquenceNumber)
+                    .Build());
+
+            var message = string.Format(CultureInfo.InvariantCulture,
+                "Submitted master evaluator with core [{0}], memory [{1}] and 
batch id [{2}].",
+                _updateEvaluatorSpecification.Core,
+                _updateEvaluatorSpecification.Megabytes,
+                MasterBatchId + _masterBatchIdSquenceNumber);
+            Logger.Log(Level.Info, message);
+
+            _masterBatchIdSquenceNumber++;
+        }
+
+        /// <summary>
+        /// Request map evaluators from resource manager
+        /// </summary>
+        /// <param name="numEvaluators">Number of evaluators to request</param>
+        internal void RequestMapEvaluators(int numEvaluators)
+        {
+            _evaluatorRequestor.Submit(
+                _evaluatorRequestor.NewBuilder()
+                    .SetMegabytes(_mapperEvaluatorSpecification.Megabytes)
+                    .SetNumber(numEvaluators)
+                    .SetCores(_mapperEvaluatorSpecification.Core)
+                    .SetEvaluatorBatchId(MapperBatchId + 
_mapperBatchIdSquenceNumber)
+                    .Build());
+
+            var message = string.Format(CultureInfo.InvariantCulture,
+                "Submitted [{0}] mapper evaluators with core [{1}], memory 
[{2}] and batch id [{3}].",
+                numEvaluators,
+                _mapperEvaluatorSpecification.Core,
+                _mapperEvaluatorSpecification.Megabytes,
+                MasterBatchId + _mapperBatchIdSquenceNumber);
+            Logger.Log(Level.Info, message);
+
+            _mapperBatchIdSquenceNumber++;
+        }
+
+        /// <summary>
+        /// Add an Evaluator id to _allocatedEvaluators.
+        /// If the IAllocatedEvaluator is for master, set master Evaluator id
+        /// IMRUSystemException will be thrown in the following cases:
+        ///   The Evaluator Id is already in the allocated Evaluator collection
+        ///   The added IAllocatedEvaluator is the last one expected, and 
master Evaluator is still not added yet
+        ///   The number of AllocatedEvaluators has reached the total expected 
Evaluators
+        /// </summary>
+        /// <param name="evaluator"></param>
+        internal void AddAllocatedEvaluator(IAllocatedEvaluator evaluator)
+        {
+            if (IsAllocatedEvaluator(evaluator.Id))
+            {
+                string msg = string.Format("The allocated evaluator {0} 
already exists.", evaluator.Id);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+
+            if (IsEvaluatorForMaster(evaluator))
+            {
+                SetMasterEvaluatorId(evaluator.Id);
+            }
+
+            if (NumberOfAllocatedEvaluators >= _totalExpectedEvaluators)
+            {
+                string msg = string.Format("Trying to add an additional 
Evaluator {0}, but the total expected Evaluator number {1} has been reached.", 
evaluator.Id, _totalExpectedEvaluators);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+            
+            _allocatedEvaluatorIds.Add(evaluator.Id);
+
+            if (_masterEvaluatorId == null && NumberOfAllocatedEvaluators == 
_totalExpectedEvaluators)
+            {
+                string msg = string.Format("Added the last Evaluator {0} but 
master Evaluator is not added yet.", evaluator.Id);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+        }
+
+        /// <summary>
+        /// Remove an Evaluator from allocated Evaluator collection by 
evaluator id.
+        /// If the given evaluator id is not in allocated Evaluator 
collection, throw IMRUSystemException.
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        internal void RemoveAllocatedEvaluator(string evaluatorId)
+        {
+            if (!IsAllocatedEvaluator(evaluatorId))
+            {
+                string msg = string.Format("The allocated evaluator to be 
removed {0} does not exist.", evaluatorId);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+            _allocatedEvaluatorIds.Remove(evaluatorId);
+        }
+
+        /// <summary>
+        /// Returns number of allocated Evaluators
+        /// </summary>
+        internal int NumberOfAllocatedEvaluators
+        {
+            get { return _allocatedEvaluatorIds.Count; }
+        }
+
+        /// <summary>
+        /// Checks if the Evaluator with the specified id has been allocated
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        /// <returns></returns>
+        internal bool IsAllocatedEvaluator(string evaluatorId)
+        {
+            return _allocatedEvaluatorIds.Contains(evaluatorId);
+        }
+
+        /// <summary>
+        /// Checks if all the expected Evaluators are allocated.
+        /// </summary>
+        internal bool AreAllEvaluatorsAllocated()
+        {
+            return _totalExpectedEvaluators == NumberOfAllocatedEvaluators && 
_masterEvaluatorId != null;
+        }
+
+        /// <summary>
+        /// Records failed Evaluator
+        /// Removes it from allocated Evaluator and adds it to the failed 
Evaluators collection
+        /// If the evaluatorId is not in _failedEvaluators, throw 
IMRUSystemException
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        internal void RecordFailedEvaluator(string evaluatorId)
+        {
+            RemoveAllocatedEvaluator(evaluatorId);
+
+            if (_failedEvaluatorIds.Contains(evaluatorId))
+            {
+                string msg = string.Format("The failed evaluator {0} has been 
recorded.", evaluatorId);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+            _failedEvaluatorIds.Add(evaluatorId);
+        }
+
+        /// <summary>
+        /// Checks if the number of failed Evaluators has reached allowed 
maximum number of evaluator failures 
+        /// </summary>
+        internal bool ReachedMaximumNumberOfEvaluatorFailures()
+        {
+            return _failedEvaluatorIds.Count >= 
AllowedNumberOfEvaluatorFailures;
+        }
+
+        /// <summary>
+        /// Returns allowed maximum number of evaluator failures
+        /// </summary>
+        internal int AllowedNumberOfEvaluatorFailures
+        {
+            get { return _allowedNumberOfEvaluatorFailures; }
+        }
+
+        /// <summary>
+        /// Reset failed Evaluator collection
+        /// </summary>
+        internal void ResetFailedEvaluators()
+        {
+            if (IsMasterEvaluatorFailed())
+            {
+                ResetMasterEvaluatorId();
+            }
+            _failedEvaluatorIds.Clear();
+        }
+
+        /// <summary>
+        /// Sets master Evaluator id.
+        /// Throws IMRUSystemException if evaulatorId is null or 
_masterEvaluatorId is not null.
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        private void SetMasterEvaluatorId(string evaluatorId)
+        {
+            if (evaluatorId == null)
+            {
+                Exceptions.Throw(new IMRUSystemException("Master evaluatorId 
should not be null."), Logger);
+            }
+
+            if (_masterEvaluatorId != null)
+            {
+                string msg = string.Format("There is already a master 
evaluator {0}", _masterEvaluatorId);
+                Exceptions.Throw(new IMRUSystemException(msg), Logger);
+            }
+            _masterEvaluatorId = evaluatorId;
+        }
+
+        /// <summary>
+        /// Sets master Evaluator id to null.
+        /// If the master Evaluator is already null, throw IMRUSystemException
+        /// </summary>
+        private void ResetMasterEvaluatorId()
+        {
+            if (_masterEvaluatorId == null)
+            {
+                Exceptions.Throw(new IMRUSystemException("Master evaluator is 
already null"), Logger);
+            }
+            _masterEvaluatorId = null;
+        }
+
+        /// <summary>
+        /// Checks if the IAllocatedEvaluator is for master
+        /// </summary>
+        /// <param name="evaluator"></param>
+        /// <returns></returns>
+        internal bool IsEvaluatorForMaster(IAllocatedEvaluator evaluator)
+        {
+            return evaluator.EvaluatorBatchId.StartsWith(MasterBatchId);
+        }
+
+        /// <summary>
+        /// Checks if the evaluator id is the master evaluator id
+        /// </summary>
+        /// <param name="evaluatorId"></param>
+        /// <returns></returns>
+        internal bool IsMasterEvaluatorId(string evaluatorId)
+        {
+            if (_masterEvaluatorId != null)
+            {
+                return _masterEvaluatorId.Equals(evaluatorId);
+            }
+            return false;
+        }
+
+        /// <summary>
+        /// Checks if the master Evaluator failed
+        /// </summary>
+        /// <returns></returns>
+        internal bool IsMasterEvaluatorFailed()
+        {
+            return _masterEvaluatorId != null && 
_failedEvaluatorIds.Contains(_masterEvaluatorId);
+        }
+
+        /// <summary>
+        /// Returns number of failed mapper Evaluators
+        /// </summary>
+        /// <returns></returns>
+        internal int NumberofFailedMappers()
+        {
+            if (IsMasterEvaluatorFailed())
+            {
+                return _failedEvaluatorIds.Count - 1;
+            }
+            return _failedEvaluatorIds.Count;
+        }
+
+        /// <summary>
+        /// Returns number of missing Evaluators
+        /// </summary>
+        internal int NumberOfMissingEvaluators()
+        {
+            return _totalExpectedEvaluators - NumberOfAllocatedEvaluators;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/5a8b518e/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorSpecification.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorSpecification.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorSpecification.cs
new file mode 100644
index 0000000..6ecd7c6
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorSpecification.cs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+// 
+//   http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Driver
+{
+    /// <summary>
+    /// Specification for an Evaluator
+    /// </summary>
+    internal sealed class EvaluatorSpecification
+    {
+        /// <summary>
+        /// Create an EvaluatorSpecification
+        /// </summary>
+        /// <param name="megabytes"></param>
+        /// <param name="core"></param>
+        internal EvaluatorSpecification(int megabytes, int core)
+        {
+            Megabytes = megabytes;
+            Core = core;
+        }
+
+        /// <summary>
+        /// Size of the memory for the Evaluator request
+        /// </summary>
+        internal int Megabytes { get; private set; }
+
+        /// <summary>
+        /// Number of core for the Evaluator request
+        /// </summary>
+        internal int Core { get; private set; }
+    }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/5a8b518e/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj 
b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 958040a..2f5cf03 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -72,6 +72,8 @@ under the License.
     <Compile Include="OnREEF\Driver\ContextAndServiceConfiguration.cs" />
     <Compile Include="OnREEF\Driver\ActiveContextManager.cs" />
     <Compile Include="OnREEF\Driver\DataLoadingContext.cs" />
+    <Compile Include="OnREEF\Driver\EvaluatorManager.cs" />
+    <Compile Include="OnREEF\Driver\EvaluatorSpecification.cs" />
     <Compile 
Include="OnREEF\Driver\MaximumNumberOfEvalutorFailuresExceededException.cs" />
     <Compile Include="OnREEF\Driver\IMRUSystemException.cs" />
     <Compile Include="OnREEF\Driver\IMRUConstants.cs" />

http://git-wip-us.apache.org/repos/asf/reef/blob/5a8b518e/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestContextStart.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestContextStart.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestContextStart.cs
index dce71ac..5dddf3c 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestContextStart.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/FaultTolerant/TestContextStart.cs
@@ -98,7 +98,7 @@ namespace Org.Apache.REEF.Tests.Functional.FaultTolerant
             }
 
             public void OnNext(IActiveContext value)
-            {               
+            {
                 Logger.Log(Level.Info, "IActiveContext: " + value.Id);
 
                 if (_first)

Reply via email to