Repository: reef Updated Branches: refs/heads/master cf8275e39 -> 6a1b710b0
[REEF-1224] IMRU Fault Tolerance - Separate Data downloading from Task injection This addressed the issue by * forcing IDataPartition to be instantiated at root context and service level in mappers * instantiating the Network service at task level to make fault tolerance easier JIRA: [REEF-1224](https://issues.apache.org/jira/browse/REEF-1224) Pull Request: This closes #952 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/6a1b710b Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/6a1b710b Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/6a1b710b Branch: refs/heads/master Commit: 6a1b710b08df0906f48da44d1374522e9dc64865 Parents: cf8275e Author: Dhruv <[email protected]> Authored: Tue Mar 22 14:05:08 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Fri Apr 22 10:45:27 2016 -0700 ---------------------------------------------------------------------- .../MapperCount/IdentityMapFunction.cs | 4 +- .../MapperCount/MapperCount.cs | 3 +- .../PipelinedBroadcastAndReduce.cs | 3 +- lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs | 5 +- .../InProcess/InProcessIMRUClient.cs | 3 +- .../OnREEF/Client/REEFIMRUClient.cs | 19 +- .../OnREEF/Driver/DataLoadingContext.cs | 71 ++++ .../OnREEF/Driver/IMRUConstants.cs | 1 + .../OnREEF/Driver/IMRUDriver.cs | 387 ++++++++++++------- .../OnREEF/Driver/IMRUSystemException.cs | 57 +++ ...NumberOfEvalutorFailuresExceededException.cs | 47 +++ .../ServiceAndContextConfigurationProvider.cs | 298 +++++++++----- .../Org.Apache.REEF.IMRU.csproj | 3 + 13 files changed, 653 insertions(+), 248 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/IdentityMapFunction.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/IdentityMapFunction.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/IdentityMapFunction.cs index 35228a2..2eca3fd 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/IdentityMapFunction.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/IdentityMapFunction.cs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. +using System.IO; using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IO.PartitionedData; using Org.Apache.REEF.Tang.Annotations; namespace Org.Apache.REEF.IMRU.Examples.MapperCount @@ -26,7 +28,7 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount public sealed class IdentityMapFunction : IMapFunction<int, int> { [Inject] - private IdentityMapFunction() + private IdentityMapFunction(IInputPartition<Stream> dataPartition) { } http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs index e00370b..a559ff8 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/MapperCount/MapperCount.cs @@ -16,6 +16,7 @@ // under the License. using System; +using System.IO; using System.Linq; using Org.Apache.REEF.IMRU.API; using Org.Apache.REEF.IMRU.OnREEF.Parameters; @@ -49,7 +50,7 @@ namespace Org.Apache.REEF.IMRU.Examples.MapperCount /// <returns>The number of MapFunction instances that are part of the job.</returns> public int Run(int numberofMappers, string outputFile, IConfiguration fileSystemConfig) { - var results = _imruClient.Submit<int, int, int>( + var results = _imruClient.Submit<int, int, int, Stream>( new IMRUJobDefinitionBuilder() .SetMapFunctionConfiguration(IMRUMapConfiguration<int, int>.ConfigurationModule .Set(IMRUMapConfiguration<int, int>.MapFunction, GenericType<IdentityMapFunction>.Class) http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs index b438fbf..de1598c 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/PipelinedBroadcastReduce/PipelinedBroadcastAndReduce.cs @@ -16,6 +16,7 @@ // under the License. using System.Globalization; +using System.IO; using Org.Apache.REEF.IMRU.API; using Org.Apache.REEF.IO.PartitionedData.Random; using Org.Apache.REEF.Tang.Annotations; @@ -73,7 +74,7 @@ namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce chunkSize.ToString(CultureInfo.InvariantCulture)) .Build(); - var results = _imruClient.Submit<int[], int[], int[]>( + var results = _imruClient.Submit<int[], int[], int[], Stream>( new IMRUJobDefinitionBuilder() .SetMapFunctionConfiguration(IMRUMapConfiguration<int[], int[]>.ConfigurationModule .Set(IMRUMapConfiguration<int[], int[]>.MapFunction, http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs index 2fa6b3c..bf92820 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUClient.cs @@ -22,16 +22,17 @@ using Org.Apache.REEF.Utilities.Attributes; namespace Org.Apache.REEF.IMRU.API { public interface IIMRUClient - { + { /// <summary> /// Submit the given job for execution. /// </summary> /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> /// <typeparam name="TResult">The return type of the computation.</typeparam> + /// <typeparam name="TPartitionType">Type of data partition (Generic type in IInputPartition)</typeparam> /// <param name="jobDefinition">IMRU job definition</param> /// <returns>Result of IMRU</returns> - IEnumerable<TResult> Submit<TMapInput, TMapOutput, TResult>(IMRUJobDefinition jobDefinition); + IEnumerable<TResult> Submit<TMapInput, TMapOutput, TResult, TPartitionType>(IMRUJobDefinition jobDefinition); /// <summary> /// DriverHttpEndPoint returned by IReefClient after job submission http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs index d79e4c4..a2acf5a 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/InProcess/InProcessIMRUClient.cs @@ -64,9 +64,10 @@ namespace Org.Apache.REEF.IMRU.InProcess /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> /// <typeparam name="TResult">The return type of the computation.</typeparam> + /// <typeparam name="TPartitionType">Type of data partition (Generic type in IInputPartition)</typeparam> /// <param name="jobDefinition">Job definition given by the user</param> /// <returns>The result of the job</returns> - public IEnumerable<TResult> Submit<TMapInput, TMapOutput, TResult>(IMRUJobDefinition jobDefinition) + public IEnumerable<TResult> Submit<TMapInput, TMapOutput, TResult, TPartitionType>(IMRUJobDefinition jobDefinition) { IConfiguration overallPerMapConfig = null; try http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs index c2f82fd..969a874 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs @@ -69,9 +69,10 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client /// <typeparam name="TMapInput">The type of the side information provided to the Map function</typeparam> /// <typeparam name="TMapOutput">The return type of the Map function</typeparam> /// <typeparam name="TResult">The return type of the computation.</typeparam> + /// <typeparam name="TPartitionType">Type of data partition (Generic type in IInputPartition)</typeparam> /// <param name="jobDefinition">IMRU job definition given by the user</param> /// <returns>Null as results will be later written to some directory</returns> - IEnumerable<TResult> IIMRUClient.Submit<TMapInput, TMapOutput, TResult>(IMRUJobDefinition jobDefinition) + IEnumerable<TResult> IIMRUClient.Submit<TMapInput, TMapOutput, TResult, TPartitionType>(IMRUJobDefinition jobDefinition) { string driverId = string.Format("IMRU-{0}-Driver", jobDefinition.JobName); IConfiguration overallPerMapConfig = null; @@ -90,15 +91,19 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client { DriverConfiguration.ConfigurationModule .Set(DriverConfiguration.OnEvaluatorAllocated, - GenericType<IMRUDriver<TMapInput, TMapOutput, TResult>>.Class) + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) .Set(DriverConfiguration.OnDriverStarted, - GenericType<IMRUDriver<TMapInput, TMapOutput, TResult>>.Class) + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) .Set(DriverConfiguration.OnContextActive, - GenericType<IMRUDriver<TMapInput, TMapOutput, TResult>>.Class) + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) .Set(DriverConfiguration.OnTaskCompleted, - GenericType<IMRUDriver<TMapInput, TMapOutput, TResult>>.Class) + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) .Set(DriverConfiguration.OnEvaluatorFailed, - GenericType<IMRUDriver<TMapInput, TMapOutput, TResult>>.Class) + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(DriverConfiguration.OnContextFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(DriverConfiguration.OnTaskFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) .Set(DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString()) .Build(), TangFactory.GetTang().NewConfigurationBuilder() @@ -146,7 +151,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Client // The JobSubmission contains the Driver configuration as well as the files needed on the Driver. var imruJobSubmission = _jobRequestBuilder .AddDriverConfiguration(imruDriverConfiguration) - .AddGlobalAssemblyForType(typeof(IMRUDriver<TMapInput, TMapOutput, TResult>)) + .AddGlobalAssemblyForType(typeof(IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>)) .SetJobIdentifier(jobDefinition.JobName) .SetDriverMemory(5000) .Build(); http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs new file mode 100644 index 0000000..5ff36de --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/DataLoadingContext.cs @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Common.Events; +using Org.Apache.REEF.IO.PartitionedData; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Utilities.Diagnostics; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.IMRU.OnREEF.Driver +{ + /// <summary> + /// This is part of Root context of map evaluators and instantiates + /// input partition and later will call AddCache function also once + /// REEF-1339 is resolved. + /// </summary> + /// <typeparam name="T">Data Handle Type</typeparam> + internal class DataLoadingContext<T> : IObserver<IContextStart> + { + private static readonly Logger Logger = Logger.GetLogger(typeof(DataLoadingContext<T>)); + private readonly IInputPartition<T> _partition; + + [Inject] + private DataLoadingContext(IInputPartition<T> partition) + { + _partition = partition; + Logger.Log(Level.Verbose, "Entered data loading context"); + } + + /// <summary> + /// Specifies what to do when context starts. + /// </summary> + /// <param name="value">context start token</param> + /// TODO[REEF-1339] - AddCache() function of IInputPartition will be called here. + public void OnNext(IContextStart value) + { + } + + /// <summary> + /// Specifies what to do if error occurs. We throw + /// the caught exception in this case. + /// </summary> + /// <param name="error">Exception</param> + public void OnError(Exception error) + { + Exceptions.Throw(error, "Error occured in Data Loading context start", Logger); + } + + /// <summary> + /// Specifies what to do at completion. In this case do nothing. + /// </summary> + public void OnCompleted() + { + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs index dd9edf2..f4fba2e 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUConstants.cs @@ -25,5 +25,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver internal const int TreeFanout = 2; internal const string MapTaskPrefix = "IMRUMap"; internal const string UpdateTaskName = "IMRUMaster"; + internal const string MasterContextId = "MasterRootContext"; } } http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs index 9197703..59be761 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs @@ -31,6 +31,7 @@ using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage; using Org.Apache.REEF.IMRU.OnREEF.Parameters; using Org.Apache.REEF.IMRU.OnREEF.ResultHandler; using Org.Apache.REEF.IO.PartitionedData; +using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Pipelining; @@ -53,32 +54,38 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <typeparam name="TMapInput">Map Input</typeparam> /// <typeparam name="TMapOutput">Map output</typeparam> /// <typeparam name="TResult">Result</typeparam> - internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult> : IObserver<IDriverStarted>, - IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<ICompletedTask>, IObserver<IFailedEvaluator> + /// <typeparam name="TPartitionType">Type of data partition (Generic type in IInputPartition)</typeparam> + internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType> + : IObserver<IDriverStarted>, + IObserver<IAllocatedEvaluator>, + IObserver<IActiveContext>, + IObserver<ICompletedTask>, + IObserver<IFailedEvaluator>, + IObserver<IFailedContext>, + IObserver<IFailedTask> { - private static readonly Logger Logger = Logger.GetLogger(typeof(IMRUDriver<TMapInput, TMapOutput, TResult>)); + private static readonly Logger Logger = + Logger.GetLogger(typeof(IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>)); private readonly ConfigurationManager _configurationManager; - private readonly IPartitionedInputDataSet _dataSet; + private readonly int _totalMappers; private readonly IEvaluatorRequestor _evaluatorRequestor; private ICommunicationGroupDriver _commGroup; private readonly IGroupCommDriver _groupCommDriver; private readonly TaskStarter _groupCommTaskStarter; - private readonly ConcurrentStack<string> _taskIdStack; private readonly ConcurrentStack<IConfiguration> _perMapperConfiguration; - private readonly Stack<IPartitionDescriptor> _partitionDescriptorStack; private readonly int _coresPerMapper; private readonly int _coresForUpdateTask; private readonly int _memoryPerMapper; private readonly int _memoryForUpdateTask; private readonly ISet<IPerMapperConfigGenerator> _perMapperConfigs; - private readonly ConcurrentBag<ICompletedTask> _completedTasks; + private readonly ISet<ICompletedTask> _completedTasks = new HashSet<ICompletedTask>(); private readonly int _allowedFailedEvaluators; private int _currentFailedEvaluators = 0; - private bool _reachedUpdateTaskActiveContext = false; private readonly bool _invokeGC; + private int _numberOfReadyTasks = 0; - private readonly ServiceAndContextConfigurationProvider<TMapInput, TMapOutput> + private readonly ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType> _serviceAndContextConfigurationProvider; [Inject] @@ -94,7 +101,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver [Parameter(typeof(InvokeGC))] bool invokeGC, IGroupCommDriver groupCommDriver) { - _dataSet = dataSet; _configurationManager = configurationManager; _evaluatorRequestor = evaluatorRequestor; _groupCommDriver = groupCommDriver; @@ -103,23 +109,23 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver _memoryPerMapper = memoryPerMapper; _memoryForUpdateTask = memoryForUpdateTask; _perMapperConfigs = perMapperConfigs; - _completedTasks = new ConcurrentBag<ICompletedTask>(); + _totalMappers = dataSet.Count; + _allowedFailedEvaluators = (int)(failedEvaluatorsFraction * dataSet.Count); _invokeGC = invokeGC; AddGroupCommunicationOperators(); - _groupCommTaskStarter = new TaskStarter(_groupCommDriver, _dataSet.Count + 1); - - _taskIdStack = new ConcurrentStack<string>(); - _perMapperConfiguration = new ConcurrentStack<IConfiguration>(); - _partitionDescriptorStack = new Stack<IPartitionDescriptor>(); - ConstructTaskIdAndPartitionDescriptorStack(); + _groupCommTaskStarter = new TaskStarter(_groupCommDriver, _totalMappers + 1); + _perMapperConfiguration = ConstructPerMapperConfigStack(_totalMappers); _serviceAndContextConfigurationProvider = - new ServiceAndContextConfigurationProvider<TMapInput, TMapOutput>(dataSet.Count + 1, groupCommDriver, - _configurationManager, _partitionDescriptorStack); - - var msg = string.Format("map task memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3}", - _memoryPerMapper, _memoryForUpdateTask, _coresPerMapper, _coresForUpdateTask); + new ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>(dataSet); + + var msg = + string.Format("map task memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3}", + _memoryPerMapper, + _memoryForUpdateTask, + _coresPerMapper, + _coresForUpdateTask); Logger.Log(Level.Info, msg); } @@ -136,177 +142,267 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <summary> /// Specifies context and service configuration for evaluator depending /// on whether it is for Update function or for map function - /// Also handles evaluator failures /// </summary> /// <param name="allocatedEvaluator">The allocated evaluator</param> public void OnNext(IAllocatedEvaluator allocatedEvaluator) { - var configs = _serviceAndContextConfigurationProvider.GetNextConfiguration(allocatedEvaluator.Id); + var configs = + _serviceAndContextConfigurationProvider.GetContextConfigurationForEvaluatorById(allocatedEvaluator.Id); allocatedEvaluator.SubmitContextAndService(configs.Context, configs.Service); } /// <summary> - /// Specfies the Map or Update task to run on the active context + /// Specifies the Map or Update task to run on the active context /// </summary> /// <param name="activeContext"></param> public void OnNext(IActiveContext activeContext) { Logger.Log(Level.Verbose, string.Format("Received Active Context {0}", activeContext.Id)); - if (_groupCommDriver.IsMasterTaskContext(activeContext)) + if (_serviceAndContextConfigurationProvider.IsMasterEvaluatorId(activeContext.EvaluatorId)) { - _reachedUpdateTaskActiveContext = true; - RequestMapEvaluators(_dataSet.Count); - - var partialTaskConf = - TangFactory.GetTang() - .NewConfigurationBuilder(new[] - { - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, - IMRUConstants.UpdateTaskName) - .Set(TaskConfiguration.Task, - GenericType<UpdateTaskHost<TMapInput, TMapOutput, TResult>>.Class) - .Build(), - _configurationManager.UpdateFunctionConfiguration, - _configurationManager.ResultHandlerConfiguration - }) - .BindNamedParameter(typeof(InvokeGC), _invokeGC.ToString()) - .Build(); - - try - { - TangFactory.GetTang() - .NewInjector(partialTaskConf, _configurationManager.UpdateFunctionCodecsConfiguration) - .GetInstance<IIMRUResultHandler<TResult>>(); - } - catch (InjectionException) - { - partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(partialTaskConf) - .BindImplementation(GenericType<IIMRUResultHandler<TResult>>.Class, - GenericType<DefaultResultHandler<TResult>>.Class) - .Build(); - Logger.Log(Level.Warning, - "User has not given any way to handle IMRU result, defaulting to ignoring it"); - } - + Logger.Log(Level.Verbose, "Submitting master task"); _commGroup.AddTask(IMRUConstants.UpdateTaskName); - _groupCommTaskStarter.QueueTask(partialTaskConf, activeContext); + _groupCommTaskStarter.QueueTask(GetUpdateTaskConfiguration(), activeContext); + RequestMapEvaluators(_totalMappers); } else { - string taskId; - - if (!_taskIdStack.TryPop(out taskId)) - { - Logger.Log(Level.Warning, "No task Ids exist for the active context {0}. Disposing the context.", - activeContext.Id); - activeContext.Dispose(); - return; - } - - IConfiguration mapSpecificConfig; - - if (!_perMapperConfiguration.TryPop(out mapSpecificConfig)) - { - Logger.Log(Level.Warning, - "No per map configuration exist for the active context {0}. Disposing the context.", - activeContext.Id); - activeContext.Dispose(); - return; - } - - var partialTaskConf = - TangFactory.GetTang() - .NewConfigurationBuilder(new[] - { - TaskConfiguration.ConfigurationModule - .Set(TaskConfiguration.Identifier, taskId) - .Set(TaskConfiguration.Task, GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class) - .Build(), - _configurationManager.MapFunctionConfiguration, - mapSpecificConfig - }) - .BindNamedParameter(typeof(InvokeGC), _invokeGC.ToString()) - .Build(); - + Logger.Log(Level.Verbose, "Submitting map task"); + _serviceAndContextConfigurationProvider.RecordActiveContextPerEvaluatorId(activeContext.EvaluatorId); + string taskId = GetTaskIdByEvaluatorId(activeContext.EvaluatorId); _commGroup.AddTask(taskId); - _groupCommTaskStarter.QueueTask(partialTaskConf, activeContext); + _groupCommTaskStarter.QueueTask(GetMapTaskConfiguration(activeContext, taskId), activeContext); + Interlocked.Increment(ref _numberOfReadyTasks); + Logger.Log(Level.Verbose, string.Format("{0} Tasks are ready for submission", _numberOfReadyTasks)); } } /// <summary> - /// Specfies what to do when the task is completed + /// Specifies what to do when the task is completed /// In this case just disposes off the task /// </summary> /// <param name="completedTask">The link to the completed task</param> public void OnNext(ICompletedTask completedTask) { - _completedTasks.Add(completedTask); - - if (_completedTasks.Count != _dataSet.Count + 1) + lock (_completedTasks) { - return; - } - - foreach (var task in _completedTasks) - { - Logger.Log(Level.Verbose, string.Format("Disposing task: {0}", task.Id)); - task.ActiveContext.Dispose(); + Logger.Log(Level.Info, + string.Format("Received completed task message from task Id: {0}", completedTask.Id)); + _completedTasks.Add(completedTask); + + if (AreIMRUTasksCompleted()) + { + ShutDownAllEvaluators(); + } } } + /// <summary> + /// Specifies what to do when evaluator fails. + /// If we get all completed tasks then ignore the failure + /// Else request a new evaluator. If failure happens in middle of IMRU + /// job we expect neighboring evaluators to fail while doing + /// communication and will use FailedTask and FailedContext logic to + /// order shutdown. + /// </summary> + /// <param name="value"></param> public void OnNext(IFailedEvaluator value) { - Logger.Log(Level.Info, "An evaluator failed, checking if it failed before context and service was submitted"); - int currFailedEvaluators = Interlocked.Increment(ref _currentFailedEvaluators); - - if (value.FailedContexts != null && value.FailedContexts.Count != 0) + if (AreIMRUTasksCompleted()) { - Logger.Log(Level.Info, "Some active context failed, cannot continue IMRU task"); - Exceptions.Throw(new Exception(), Logger); + Logger.Log(Level.Info, + string.Format("Evaluator with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id)); + return; } + Logger.Log(Level.Info, + string.Format("Evaluator with Id: {0} failed with Exception: {1}", value.Id, value.EvaluatorException)); + int currFailedEvaluators = Interlocked.Increment(ref _currentFailedEvaluators); if (currFailedEvaluators > _allowedFailedEvaluators) { - Exceptions.Throw(new Exception("Cannot continue IMRU job, Failed evaluators reach maximum limit"), + Exceptions.Throw(new MaximumNumberOfEvaluatorFailuresExceededException(_allowedFailedEvaluators), Logger); } - Logger.Log(Level.Info, "Requesting for the failed evaluator again"); + _serviceAndContextConfigurationProvider.RecordEvaluatorFailureById(value.Id); + bool isMaster = _serviceAndContextConfigurationProvider.IsMasterEvaluatorId(value.Id); - _serviceAndContextConfigurationProvider.EvaluatorFailed(value.Id); - - // if active context stage is reached for Update Task then assume that failed - // evaluator belongs to mapper - if (_reachedUpdateTaskActiveContext) + // If failed evaluator is master then ask for master + // evaluator else ask for mapper evaluator + if (!isMaster) { + Logger.Log(Level.Info, string.Format("Requesting a replacement map Evaluator for {0}", value.Id)); RequestMapEvaluators(1); } else { + Logger.Log(Level.Info, string.Format("Requesting a replacement master Evaluator for {0}", value.Id)); RequestUpdateEvaluator(); } } /// <summary> - /// Specfies how to handle exception or error + /// Specifies what to do if Failed Context is received. + /// An exception is thrown if tasks are not completed. /// </summary> - /// <param name="error">Kind of exception</param> - public void OnError(Exception error) + /// <param name="value"></param> + public void OnNext(IFailedContext value) { - Logger.Log(Level.Error, "Cannot currently handle the Exception in OnError function"); - throw new NotImplementedException("Cannot currently handle exception in OneError", error); + if (AreIMRUTasksCompleted()) + { + Logger.Log(Level.Info, + string.Format("Context with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id)); + return; + } + Exceptions.Throw(new Exception(string.Format("Data Loading Context with Id: {0} failed", value.Id)), Logger); } /// <summary> - /// Specfies what to do when driver is done - /// In this case do nothing + /// Specifies what to do if a task fails. + /// We throw the exception and fail IMRU unless IMRU job is already done. /// </summary> + /// <param name="value"></param> + public void OnNext(IFailedTask value) + { + if (AreIMRUTasksCompleted()) + { + Logger.Log(Level.Info, + string.Format("Task with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id)); + return; + } + Exceptions.Throw(new Exception(string.Format("Task with Id: {0} failed", value.Id)), Logger); + } + + public void OnError(Exception error) + { + } + public void OnCompleted() { } + private bool AreIMRUTasksCompleted() + { + return _completedTasks.Count >= _totalMappers + 1; + } + + private string GetTaskIdByEvaluatorId(string evaluatorId) + { + return string.Format("{0}-{1}-Version0", + IMRUConstants.MapTaskPrefix, + _serviceAndContextConfigurationProvider.GetPartitionIdByEvaluatorId(evaluatorId)); + } + + /// <summary> + /// Shuts down evaluators once all completed task messages are received + /// </summary> + private void ShutDownAllEvaluators() + { + foreach (var task in _completedTasks) + { + Logger.Log(Level.Info, string.Format("Disposing task: {0}", task.Id)); + task.ActiveContext.Dispose(); + } + } + + /// <summary> + /// Generates map task configuration given the active context. + /// Merge configurations of all the inputs to the MapTaskHost. + /// </summary> + /// <param name="activeContext">Active context to which task needs to be submitted</param> + /// <param name="taskId">Task Id</param> + /// <returns>Map task configuration</returns> + private IConfiguration GetMapTaskConfiguration(IActiveContext activeContext, string taskId) + { + IConfiguration mapSpecificConfig; + + if (!_perMapperConfiguration.TryPop(out mapSpecificConfig)) + { + Exceptions.Throw( + new IMRUSystemException(string.Format("No per map configuration exist for the active context {0}", + activeContext.Id)), + Logger); + } + + return TangFactory.GetTang() + .NewConfigurationBuilder(TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, taskId) + .Set(TaskConfiguration.Task, GenericType<MapTaskHost<TMapInput, TMapOutput>>.Class) + .Build(), + _configurationManager.MapFunctionConfiguration, + mapSpecificConfig, + GetGroupCommConfiguration()) + .BindNamedParameter<InvokeGC, bool>(GenericType<InvokeGC>.Class, _invokeGC.ToString()) + .Build(); + } + + /// <summary> + /// Generates the update task configuration. + /// Merge configurations of all the inputs to the UpdateTaskHost. + /// </summary> + /// <returns>Update task configuration</returns> + private IConfiguration GetUpdateTaskConfiguration() + { + var partialTaskConf = + TangFactory.GetTang() + .NewConfigurationBuilder(TaskConfiguration.ConfigurationModule + .Set(TaskConfiguration.Identifier, + IMRUConstants.UpdateTaskName) + .Set(TaskConfiguration.Task, + GenericType<UpdateTaskHost<TMapInput, TMapOutput, TResult>>.Class) + .Build(), + _configurationManager.UpdateFunctionConfiguration, + _configurationManager.ResultHandlerConfiguration, + GetGroupCommConfiguration()) + .BindNamedParameter<InvokeGC, bool>(GenericType<InvokeGC>.Class, _invokeGC.ToString()) + .Build(); + + // This piece of code basically checks if user has given any implementation + // of IIMRUResultHandler. If not then bind it to default implementation which + // does nothing. For interfaces with generic type we cannot assign default + // implementation. + try + { + TangFactory.GetTang() + .NewInjector(partialTaskConf) + .GetInstance<IIMRUResultHandler<TResult>>(); + } + catch (InjectionException) + { + partialTaskConf = TangFactory.GetTang().NewConfigurationBuilder(partialTaskConf) + .BindImplementation(GenericType<IIMRUResultHandler<TResult>>.Class, + GenericType<DefaultResultHandler<TResult>>.Class) + .Build(); + Logger.Log(Level.Info, + "User has not given any way to handle IMRU result, defaulting to ignoring it"); + } + return partialTaskConf; + } + + /// <summary> + /// Generate the group communicaiton configuration to be added + /// to the tasks + /// </summary> + /// <returns>The group communication configuration</returns> + private IConfiguration GetGroupCommConfiguration() + { + var codecConfig = + TangFactory.GetTang() + .NewConfigurationBuilder( + StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set( + StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec, + GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(), + StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(), + _configurationManager.UpdateFunctionCodecsConfiguration) + .Build(); + + return Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig); + } + + /// <summary> + /// Adds broadcast and reduce operators to the default communication group + /// </summary> private void AddGroupCommunicationOperators() { var reduceFunctionConfig = _configurationManager.ReduceFunctionConfiguration; @@ -369,23 +465,31 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver .Build(); } - private void ConstructTaskIdAndPartitionDescriptorStack() + /// <summary> + /// Construct the stack of map configuraion which + /// is specific to each mapper. If user does not + /// specify any then its empty configuration + /// </summary> + /// <param name="totalMappers">Total mappers</param> + /// <returns>Stack of configuration</returns> + private ConcurrentStack<IConfiguration> ConstructPerMapperConfigStack(int totalMappers) { - int counter = 0; - - foreach (var partitionDescriptor in _dataSet) + var perMapperConfiguration = new ConcurrentStack<IConfiguration>(); + for (int i = 0; i < totalMappers; i++) { - string id = IMRUConstants.MapTaskPrefix + "-Id" + counter + "-Version0"; - _taskIdStack.Push(id); - _partitionDescriptorStack.Push(partitionDescriptor); - var emptyConfig = TangFactory.GetTang().NewConfigurationBuilder().Build(); - IConfiguration config = _perMapperConfigs.Aggregate(emptyConfig, (current, configGenerator) => Configurations.Merge(current, configGenerator.GetMapperConfiguration(counter, _dataSet.Count))); - _perMapperConfiguration.Push(config); - counter++; + IConfiguration config = _perMapperConfigs.Aggregate(emptyConfig, + (current, configGenerator) => + Configurations.Merge(current, configGenerator.GetMapperConfiguration(i, totalMappers))); + perMapperConfiguration.Push(config); } + return perMapperConfiguration; } + /// <summary> + /// Request map evaluators from resource manager + /// </summary> + /// <param name="numEvaluators">Number of evaluators to request</param> private void RequestMapEvaluators(int numEvaluators) { _evaluatorRequestor.Submit( @@ -396,6 +500,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver .Build()); } + /// <summary> + /// Request update/master evaluator from resource manager + /// </summary> private void RequestUpdateEvaluator() { _evaluatorRequestor.Submit( http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUSystemException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUSystemException.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUSystemException.cs new file mode 100644 index 0000000..94cf619 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUSystemException.cs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.IMRU.OnREEF.Driver +{ + /// <summary> + /// Type of exception thrown when possible bugs are detected in IMRU code. + /// For example, we reach forbidden region of codes, inconsistent state etc. + /// </summary> + public sealed class IMRUSystemException : Exception + { + private static readonly Logger Logger = Logger.GetLogger(typeof(IMRUSystemException)); + + /// <summary> + /// Constructor. Appends the user message with the message that + /// there is an issue in IMRU code. + /// </summary> + /// <param name="message">The user message for exception</param> + public IMRUSystemException(string message) + : base(AppendedMessage(message)) + { + } + + /// <summary> + /// Constructor. Appends the user message with the message that + /// there is an issue in IMRU code. Also throws the provided exception. + /// </summary> + /// <param name="message">The user message for exception</param> + /// <param name="inner">The actual exception message due to which connection failed</param> + public IMRUSystemException(string message, Exception inner) + : base(AppendedMessage(message), inner) + { + } + + private static string AppendedMessage(string message) + { + return "Possible Bug in the IMRU code: " + message; + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/MaximumNumberOfEvalutorFailuresExceededException.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/MaximumNumberOfEvalutorFailuresExceededException.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/MaximumNumberOfEvalutorFailuresExceededException.cs new file mode 100644 index 0000000..df3c5ea --- /dev/null +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/MaximumNumberOfEvalutorFailuresExceededException.cs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using Org.Apache.REEF.Utilities.Logging; + +namespace Org.Apache.REEF.IMRU.OnREEF.Driver +{ + /// <summary> + /// Type of exception thrown when number of failed evaluators reach the + /// maximum allowed limit. + /// </summary> + public sealed class MaximumNumberOfEvaluatorFailuresExceededException : Exception + { + private static readonly Logger Logger = + Logger.GetLogger(typeof(MaximumNumberOfEvaluatorFailuresExceededException)); + + /// <summary> + /// Constructor for throwing exception when the number of evaluator failures reaches maximum limit. + /// </summary> + /// <param name="maximumAllowedEvaluatorFailures">maximum number of evaluators allowed to fail</param> + public MaximumNumberOfEvaluatorFailuresExceededException(int maximumAllowedEvaluatorFailures) + : base(CreateMessage(maximumAllowedEvaluatorFailures)) + { + } + + private static string CreateMessage(int maximumAllowedEvaluatorFailures) + { + return string.Format("Exiting IMRU. Number of failed evaluators reach the maximum allowed limit of {0}", + maximumAllowedEvaluatorFailures); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs index 0cfeec3..36916db 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs @@ -17,12 +17,10 @@ using System; using System.Collections.Generic; -using System.Linq; -using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Events; +using Org.Apache.REEF.Common.Services; using Org.Apache.REEF.IO.PartitionedData; -using Org.Apache.REEF.Network.Group.Config; -using Org.Apache.REEF.Network.Group.Driver; -using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Utilities.Diagnostics; @@ -36,32 +34,26 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// </summary> /// <typeparam name="TMapInput"></typeparam> /// <typeparam name="TMapOutput"></typeparam> - internal sealed class ServiceAndContextConfigurationProvider<TMapInput, TMapOutput> + /// <typeparam name="TPartitionType"></typeparam> + internal sealed class ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType> { - private static readonly Logger Logger = Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, TMapOutput>)); - - private readonly Dictionary<string, ContextAndServiceConfiguration> _configurationProvider; - private readonly ISet<string> _failedEvaluators; - private readonly ISet<string> _submittedEvaluators; - private readonly object _lock; - private readonly int _numNodes; - private int _assignedPartitionDescriptors; - private readonly IGroupCommDriver _groupCommDriver; - private readonly ConfigurationManager _configurationManager; - private readonly Stack<IPartitionDescriptor> _partitionDescriptors; - - internal ServiceAndContextConfigurationProvider(int numNodes, IGroupCommDriver groupCommDriver, - ConfigurationManager configurationManager, Stack<IPartitionDescriptor> partitionDescriptors) + private static readonly Logger Logger = Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>)); + + private readonly Dictionary<string, string> _partitionIdProvider = new Dictionary<string, string>(); + private readonly ISet<string> _submittedEvaluators = new HashSet<string>(); + private readonly ISet<string> _contextLoadedEvaluators = new HashSet<string>(); + private readonly object _lock = new object(); + private readonly Stack<string> _partitionDescriptorIds = new Stack<string>(); + private readonly IPartitionedInputDataSet _dataset; + private string _masterEvaluatorId; + + internal ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset) { - _configurationProvider = new Dictionary<string, ContextAndServiceConfiguration>(); - _failedEvaluators = new HashSet<string>(); - _submittedEvaluators = new HashSet<string>(); - _numNodes = numNodes; - _groupCommDriver = groupCommDriver; - _configurationManager = configurationManager; - _assignedPartitionDescriptors = 0; - _partitionDescriptors = partitionDescriptors; - _lock = new object(); + _dataset = dataset; + foreach (var descriptor in _dataset) + { + _partitionDescriptorIds.Push(descriptor.Id); + } } /// <summary> @@ -69,113 +61,229 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// submitted evaluator to failed evaluator /// </summary> /// <param name="evaluatorId"></param> - internal void EvaluatorFailed(string evaluatorId) + /// <returns>Whether failed evaluator is master or not</returns> + internal bool RecordEvaluatorFailureById(string evaluatorId) { lock (_lock) { + string msg; + bool isMaster = IsMasterEvaluatorId(evaluatorId); + + if (_contextLoadedEvaluators.Contains(evaluatorId)) + { + msg = + string.Format( + "Failed evaluator:{0} already had context loaded. Cannot handle failure at this stage", + evaluatorId); + Exceptions.Throw(new Exception(msg), Logger); + } + if (!_submittedEvaluators.Contains(evaluatorId)) { - Exceptions.Throw(new Exception("Failed evaluator was never submitted"), Logger); + msg = string.Format("Failed evaluator:{0} was never submitted", evaluatorId); + Exceptions.Throw(new Exception(msg), Logger); + } + + if (!_partitionIdProvider.ContainsKey(evaluatorId) && !isMaster) + { + msg = string.Format("Partition descriptor for Failed evaluator:{0} not present", evaluatorId); + Exceptions.Throw(new Exception(msg), Logger); } - _failedEvaluators.Add(evaluatorId); _submittedEvaluators.Remove(evaluatorId); + + if (isMaster) + { + Logger.Log(Level.Info, "Failed Evaluator is Master"); + _masterEvaluatorId = null; + return true; + } + + Logger.Log(Level.Info, "Failed Evaluator is a Mapper"); + _partitionDescriptorIds.Push(_partitionIdProvider[evaluatorId]); + _partitionIdProvider.Remove(evaluatorId); + return false; } } /// <summary> - /// Gives context and service configuration for next evaluator either from failed - /// evaluator or new configuration + /// Notifies that active context state has been reached /// </summary> /// <param name="evaluatorId"></param> - /// <returns></returns> - internal ContextAndServiceConfiguration GetNextConfiguration(string evaluatorId) + internal void RecordActiveContextPerEvaluatorId(string evaluatorId) + { + lock (_lock) + { + if (!_submittedEvaluators.Contains(evaluatorId)) + { + var msg = string.Format("Evaluator:{0} never loaded data but still reached active context stage", + evaluatorId); + Exceptions.Throw(new Exception(msg), Logger); + } + + if (_contextLoadedEvaluators.Contains(evaluatorId)) + { + var msg = string.Format("Evaluator:{0} already reached the active context stage", evaluatorId); + Exceptions.Throw(new Exception(msg), Logger); + } + + _contextLoadedEvaluators.Add(evaluatorId); + _submittedEvaluators.Remove(evaluatorId); + } + } + + /// <summary> + /// Gets next context configuration. Either master or mapper. + /// </summary> + /// <param name="evaluatorId">Evaluator Id</param> + /// <returns>The context and service configuration</returns> + internal ContextAndServiceConfiguration GetContextConfigurationForEvaluatorById(string evaluatorId) { lock (_lock) { if (_submittedEvaluators.Contains(evaluatorId)) { - Exceptions.Throw(new Exception("The evaluator is already submitted"), Logger); + string msg = string.Format("The context is already submitted to evaluator:{0}", evaluatorId); + Exceptions.Throw(new Exception(msg), Logger); } - if (_failedEvaluators.Count == 0 && _assignedPartitionDescriptors >= _numNodes) + if (_masterEvaluatorId == null) + { + Logger.Log(Level.Info, "Submitting root context and service for master"); + _masterEvaluatorId = evaluatorId; + _submittedEvaluators.Add(evaluatorId); + return new ContextAndServiceConfiguration( + ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier, + IMRUConstants.MasterContextId).Build(), + TangFactory.GetTang().NewConfigurationBuilder().Build()); + } + + Logger.Log(Level.Info, "Submitting root context and service for a map task"); + return GetDataLoadingConfigurationForEvaluatorById(evaluatorId); + } + } + + /// <summary> + /// Checks whether evaluator id is that of master + /// </summary> + /// <param name="evaluatorId">Id of evaluator</param> + /// <returns>true if id is that of master</returns> + internal bool IsMasterEvaluatorId(string evaluatorId) + { + return evaluatorId.Equals(_masterEvaluatorId); + } + + /// <summary> + /// Gets partition Id for the evaluator + /// </summary> + /// <param name="evaluatorId"></param> + /// <returns></returns> + internal string GetPartitionIdByEvaluatorId(string evaluatorId) + { + lock (_lock) + { + string msg; + if (!_submittedEvaluators.Contains(evaluatorId) && !_contextLoadedEvaluators.Contains(evaluatorId)) { - Exceptions.Throw(new Exception("No more configuration can be provided"), Logger); + msg = string.Format("Context for Evaluator:{0} has never been submitted", evaluatorId); + Exceptions.Throw(new IMRUSystemException(msg), Logger); } - // if some failed id exists return that configuration - if (_failedEvaluators.Count != 0) + if (IsMasterEvaluatorId(evaluatorId)) { - string failedEvaluatorId = _failedEvaluators.First(); - _failedEvaluators.Remove(failedEvaluatorId); - var config = _configurationProvider[failedEvaluatorId]; - _configurationProvider.Remove(failedEvaluatorId); - _configurationProvider[evaluatorId] = config; + msg = string.Format("Evaluator:{0} is master and does not get partition", evaluatorId); + Exceptions.Throw(new IMRUSystemException(msg), Logger); } - else + + if (!_partitionIdProvider.ContainsKey(evaluatorId)) { - _assignedPartitionDescriptors++; - - if (_configurationProvider.ContainsKey(evaluatorId)) - { - Exceptions.Throw( - new Exception( - "Evaluator Id already present in configuration cache, they have to be unique"), - Logger); - } - - // Checks whether to put update task configuration or map task configuration - if (_assignedPartitionDescriptors == 1) - { - _configurationProvider[evaluatorId] = GetUpdateTaskContextAndServiceConfiguration(); - } - else - { - _configurationProvider[evaluatorId] = - GetMapTaskContextAndServiceConfiguration(_partitionDescriptors.Pop()); - } + msg = string.Format("Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId); + Exceptions.Throw(new IMRUSystemException(msg), Logger); } - _submittedEvaluators.Add(evaluatorId); - return _configurationProvider[evaluatorId]; + return _partitionIdProvider[evaluatorId]; } } - private ContextAndServiceConfiguration GetMapTaskContextAndServiceConfiguration(IPartitionDescriptor partitionDescriptor) + /// <summary> + /// Gives context and service configuration for next evaluator either from failed + /// evaluator or new configuration + /// </summary> + /// <param name="evaluatorId"></param> + /// <returns></returns> + private ContextAndServiceConfiguration GetDataLoadingConfigurationForEvaluatorById(string evaluatorId) { - var codecConfig = - TangFactory.GetTang() - .NewConfigurationBuilder( - StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set( - StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec, - GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(), - StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(), - _configurationManager.MapInputCodecConfiguration) - .Build(); + string msg; + + if (_contextLoadedEvaluators.Contains(evaluatorId)) + { + msg = string.Format("Evaluator:{0} already has the data loaded", evaluatorId); + Exceptions.Throw(new IMRUSystemException(msg), Logger); + } + + if (_partitionDescriptorIds.Count == 0) + { + Exceptions.Throw(new IMRUSystemException("No more data configuration can be provided"), Logger); + } - var contextConf = _groupCommDriver.GetContextConfiguration(); - var serviceConf = Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig, partitionDescriptor.GetPartitionConfiguration()); + if (_partitionIdProvider.ContainsKey(evaluatorId)) + { + msg = + string.Format( + "Evaluator Id:{0} already present in configuration cache, they have to be unique", + evaluatorId); + Exceptions.Throw(new IMRUSystemException(msg), Logger); + } - return new ContextAndServiceConfiguration(contextConf, serviceConf); + Logger.Log(Level.Info, "Getting a new data loading configuration"); + _partitionIdProvider[evaluatorId] = _partitionDescriptorIds.Pop(); + _submittedEvaluators.Add(evaluatorId); + + msg = string.Format( + "Current status: Submitted Evaluators-{0}, Data Loaded Evaluators-{1}, Unused data partitions-{2}", + _submittedEvaluators.Count, + _contextLoadedEvaluators.Count, + _partitionDescriptorIds.Count); + Logger.Log(Level.Info, msg); + + try + { + IPartitionDescriptor partitionDescriptor = + _dataset.GetPartitionDescriptorForId(_partitionIdProvider[evaluatorId]); + return GetDataLoadingContextAndServiceConfiguration(partitionDescriptor, evaluatorId); + } + catch (Exception e) + { + msg = string.Format("Error while trying to access partition descriptor:{0} from dataset", + _partitionIdProvider[evaluatorId]); + Exceptions.Throw(e, msg, Logger); + return null; + } } - private ContextAndServiceConfiguration GetUpdateTaskContextAndServiceConfiguration() + private ContextAndServiceConfiguration GetDataLoadingContextAndServiceConfiguration( + IPartitionDescriptor partitionDescriptor, + string evaluatorId) { - var codecConfig = + var dataLoadingContextConf = TangFactory.GetTang() - .NewConfigurationBuilder( - new[] - { - StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Conf.Set( - StreamingCodecConfiguration<MapInputWithControlMessage<TMapInput>>.Codec, - GenericType<MapInputWithControlMessageCodec<TMapInput>>.Class).Build(), - StreamingCodecConfigurationMinusMessage<TMapOutput>.Conf.Build(), - _configurationManager.UpdateFunctionCodecsConfiguration - }) + .NewConfigurationBuilder() + .BindSetEntry<ContextConfigurationOptions.StartHandlers, DataLoadingContext<TPartitionType>, IObserver<IContextStart>>( + GenericType<ContextConfigurationOptions.StartHandlers>.Class, + GenericType<DataLoadingContext<TPartitionType>>.Class) .Build(); - var serviceConf = Configurations.Merge(_groupCommDriver.GetServiceConfiguration(), codecConfig); - return new ContextAndServiceConfiguration(_groupCommDriver.GetContextConfiguration(), serviceConf); + var serviceConf = + TangFactory.GetTang() + .NewConfigurationBuilder(ServiceConfiguration.ConfigurationModule.Build(), + dataLoadingContextConf, + partitionDescriptor.GetPartitionConfiguration()) + .Build(); + + var contextConf = ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, string.Format("DataLoading-{0}", evaluatorId)) + .Build(); + return new ContextAndServiceConfiguration(contextConf, serviceConf); } } } http://git-wip-us.apache.org/repos/asf/reef/blob/6a1b710b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj index 7d6e8c3..7c6c8f3 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj +++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj @@ -70,6 +70,9 @@ under the License. <Compile Include="OnREEF\Client\REEFIMRUClient.cs" /> <Compile Include="OnREEF\Driver\ConfigurationManager.cs" /> <Compile Include="OnREEF\Driver\ContextAndServiceConfiguration.cs" /> + <Compile Include="OnREEF\Driver\DataLoadingContext.cs" /> + <Compile Include="OnREEF\Driver\MaximumNumberOfEvalutorFailuresExceededException.cs" /> + <Compile Include="OnREEF\Driver\IMRUSystemException.cs" /> <Compile Include="OnREEF\Driver\IMRUConstants.cs" /> <Compile Include="OnREEF\Driver\IMRUDriver.cs" /> <Compile Include="OnREEF\Driver\ServiceAndContextConfigurationProvider.cs" />
