http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs index 58b75ed..dafba71 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs @@ -1,4 +1,4 @@ -// Licensed to the Apache Software Foundation (ASF) under one +// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file @@ -18,14 +18,15 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Globalization; using System.Linq; -using System.Threading; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; using Org.Apache.REEF.Driver.Task; using Org.Apache.REEF.IMRU.API; +using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine; using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; using Org.Apache.REEF.IMRU.OnREEF.MapInputWithControlMessage; using Org.Apache.REEF.IMRU.OnREEF.Parameters; @@ -33,60 +34,94 @@ using Org.Apache.REEF.IMRU.OnREEF.ResultHandler; using Org.Apache.REEF.IO.PartitionedData; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver; -using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Pipelining; using Org.Apache.REEF.Network.Group.Pipelining.Impl; using Org.Apache.REEF.Network.Group.Topology; +using Org.Apache.REEF.Network.Naming; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.IMRU.OnREEF.Driver { /// <summary> - /// Implements the IMRU driver on REEF + /// Implements the IMRU driver on REEF with fault tolerant /// </summary> /// <typeparam name="TMapInput">Map Input</typeparam> /// <typeparam name="TMapOutput">Map output</typeparam> /// <typeparam name="TResult">Result</typeparam> /// <typeparam name="TPartitionType">Type of data partition (Generic type in IInputPartition)</typeparam> - internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType> - : IObserver<IDriverStarted>, + internal sealed class IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType> : + IObserver<IDriverStarted>, IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, IObserver<ICompletedTask>, IObserver<IFailedEvaluator>, IObserver<IFailedContext>, - IObserver<IFailedTask> + IObserver<IFailedTask>, + IObserver<IRunningTask>, + IObserver<IEnumerable<IActiveContext>> { private static readonly Logger Logger = Logger.GetLogger(typeof(IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>)); private readonly ConfigurationManager _configurationManager; private readonly int _totalMappers; - private readonly IEvaluatorRequestor _evaluatorRequestor; - private ICommunicationGroupDriver _commGroup; private readonly IGroupCommDriver _groupCommDriver; - private readonly TaskStarter _groupCommTaskStarter; - private readonly ConcurrentStack<IConfiguration> _perMapperConfiguration; - private readonly int _coresPerMapper; - private readonly int _coresForUpdateTask; - private readonly int _memoryPerMapper; - private readonly int _memoryForUpdateTask; + private readonly INameServer _nameServer; + private ConcurrentStack<IConfiguration> _perMapperConfigurationStack; private readonly ISet<IPerMapperConfigGenerator> _perMapperConfigs; - private readonly ISet<ICompletedTask> _completedTasks = new HashSet<ICompletedTask>(); - private readonly int _allowedFailedEvaluators; - private int _currentFailedEvaluators = 0; private readonly bool _invokeGC; - private int _numberOfReadyTasks = 0; + private readonly ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType> _serviceAndContextConfigurationProvider; - private readonly ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType> - _serviceAndContextConfigurationProvider; + /// <summary> + /// The lock for the driver. + /// </summary> + private readonly object _lock = new object(); + + /// <summary> + /// Manages Tasks, maintains task states and responsible for task submission for the driver. + /// </summary> + private TaskManager _taskManager; + + /// <summary> + /// Manages Active Contexts for the driver. + /// </summary> + private readonly ActiveContextManager _contextManager; + + /// <summary> + /// Manages allocated and failed Evaluators for driver. + /// </summary> + private readonly EvaluatorManager _evaluatorManager; + + /// <summary> + /// Defines the max retry number for recoveries. It is configurable for the driver. + /// </summary> + private readonly int _maxRetryNumberForFaultTolerant; + + /// <summary> + /// System State of the driver. + /// <see href="https://issues.apache.org/jira/browse/REEF-1223"></see> + /// </summary> + private SystemStateMachine _systemState; + + /// <summary> + /// Shows if the driver is first try. Once the system enters recovery, it is set to false. + /// </summary> + private bool _isFirstTry = true; + + /// <summary> + /// It records the number of retry for the recoveries. + /// </summary> + private int _numberOfRetries; + + private const int DefaultMaxNumberOfRetryInRecovery = 3; [Inject] private IMRUDriver(IPartitionedInputDataSet dataSet, @@ -98,181 +133,517 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver [Parameter(typeof(MemoryPerMapper))] int memoryPerMapper, [Parameter(typeof(MemoryForUpdateTask))] int memoryForUpdateTask, [Parameter(typeof(AllowedFailedEvaluatorsFraction))] double failedEvaluatorsFraction, + [Parameter(typeof(MaxRetryNumberInRecovery))] int maxRetryNumberInRecovery, [Parameter(typeof(InvokeGC))] bool invokeGC, - IGroupCommDriver groupCommDriver) + IGroupCommDriver groupCommDriver, + INameServer nameServer) { _configurationManager = configurationManager; - _evaluatorRequestor = evaluatorRequestor; _groupCommDriver = groupCommDriver; - _coresPerMapper = coresPerMapper; - _coresForUpdateTask = coresForUpdateTask; - _memoryPerMapper = memoryPerMapper; - _memoryForUpdateTask = memoryForUpdateTask; + _nameServer = nameServer; _perMapperConfigs = perMapperConfigs; _totalMappers = dataSet.Count; - - _allowedFailedEvaluators = (int)(failedEvaluatorsFraction * dataSet.Count); _invokeGC = invokeGC; + _maxRetryNumberForFaultTolerant = maxRetryNumberInRecovery > 0 ? maxRetryNumberInRecovery : DefaultMaxNumberOfRetryInRecovery; + + _contextManager = new ActiveContextManager(_totalMappers + 1); + _contextManager.Subscribe(this); + + var updateSpec = new EvaluatorSpecification(memoryForUpdateTask, coresForUpdateTask); + var mapperSpec = new EvaluatorSpecification(memoryPerMapper, coresPerMapper); + var allowedFailedEvaluators = (int)(failedEvaluatorsFraction * _totalMappers); + _evaluatorManager = new EvaluatorManager(_totalMappers + 1, allowedFailedEvaluators, evaluatorRequestor, updateSpec, mapperSpec); - AddGroupCommunicationOperators(); - _groupCommTaskStarter = new TaskStarter(_groupCommDriver, _totalMappers + 1); - _perMapperConfiguration = ConstructPerMapperConfigStack(_totalMappers); + _systemState = new SystemStateMachine(); _serviceAndContextConfigurationProvider = new ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>(dataSet); var msg = - string.Format("map task memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3}", - _memoryPerMapper, - _memoryForUpdateTask, - _coresPerMapper, - _coresForUpdateTask); + string.Format(CultureInfo.InvariantCulture, "map task memory:{0}, update task memory:{1}, map task cores:{2}, update task cores:{3}, maxRetry {4}, allowedFailedEvaluators {5}.", + memoryPerMapper, + memoryForUpdateTask, + coresPerMapper, + coresForUpdateTask, + _maxRetryNumberForFaultTolerant, + allowedFailedEvaluators); Logger.Log(Level.Info, msg); } + #region IDriverStarted /// <summary> - /// Requests for evaluator for update task + /// Requests evaluators when driver starts /// </summary> /// <param name="value">Event fired when driver started</param> public void OnNext(IDriverStarted value) { - RequestUpdateEvaluator(); //// TODO[REEF-598]: Set a timeout for this request to be satisfied. If it is not within that time, exit the Driver. + _evaluatorManager.RequestUpdateEvaluator(); + _evaluatorManager.RequestMapEvaluators(_totalMappers); } + #endregion IDriverStarted + #region IAllocatedEvaluator /// <summary> - /// Specifies context and service configuration for evaluator depending - /// on whether it is for Update function or for map function + /// IAllocatedEvaluator handler. It will take the following action based on the system state: + /// Case WaitingForEvaluator + /// Add Evaluator to the Evaluator Manager + /// submit Context and Services + /// Case Fail + /// Do nothing. This is because the code that sets system Fail has executed FailedAction. It has shut down all the allocated evaluators/contexts. + /// If a new IAllocatedEvaluator comes after it, we should not submit anything so that the evaluator is returned. + /// Other cases - not expected /// </summary> /// <param name="allocatedEvaluator">The allocated evaluator</param> public void OnNext(IAllocatedEvaluator allocatedEvaluator) { - var configs = - _serviceAndContextConfigurationProvider.GetContextConfigurationForEvaluatorById(allocatedEvaluator.Id); - allocatedEvaluator.SubmitContextAndService(configs.Context, configs.Service); + Logger.Log(Level.Info, "AllocatedEvaluator EvaluatorBatchId [{0}], memory [{1}], systemState {2}.", allocatedEvaluator.EvaluatorBatchId, allocatedEvaluator.GetEvaluatorDescriptor().Memory, _systemState.CurrentState); + lock (_lock) + { + using (Logger.LogFunction("IMRUDriver::IAllocatedEvaluator")) + { + switch (_systemState.CurrentState) + { + case SystemState.WaitingForEvaluator: + _evaluatorManager.AddAllocatedEvaluator(allocatedEvaluator); + SubmitContextAndService(allocatedEvaluator); + break; + case SystemState.Fail: + Logger.Log(Level.Info, + "Receiving IAllocatedEvaluator event, but system is in FAIL state, ignore it."); + allocatedEvaluator.Dispose(); + break; + default: + UnexpectedState(allocatedEvaluator.Id, "IAllocatedEvaluator"); + break; + } + } + } } /// <summary> - /// Specifies the Map or Update task to run on the active context + /// Gets context and service configuration for evaluator depending + /// on whether it is for update/master function or for mapper function. + /// Then submits Context and Service with the corresponding configuration /// </summary> - /// <param name="activeContext"></param> - public void OnNext(IActiveContext activeContext) + /// <param name="allocatedEvaluator"></param> + private void SubmitContextAndService(IAllocatedEvaluator allocatedEvaluator) { - Logger.Log(Level.Verbose, string.Format("Received Active Context {0}", activeContext.Id)); - - if (_serviceAndContextConfigurationProvider.IsMasterEvaluatorId(activeContext.EvaluatorId)) + ContextAndServiceConfiguration configs; + if (_evaluatorManager.IsEvaluatorForMaster(allocatedEvaluator)) { - Logger.Log(Level.Verbose, "Submitting master task"); - _commGroup.AddTask(IMRUConstants.UpdateTaskName); - _groupCommTaskStarter.QueueTask(GetUpdateTaskConfiguration(), activeContext); - RequestMapEvaluators(_totalMappers); + configs = + _serviceAndContextConfigurationProvider + .GetContextConfigurationForMasterEvaluatorById( + allocatedEvaluator.Id); } else { - Logger.Log(Level.Verbose, "Submitting map task"); - _serviceAndContextConfigurationProvider.RecordActiveContextPerEvaluatorId(activeContext.EvaluatorId); - string taskId = GetTaskIdByEvaluatorId(activeContext.EvaluatorId); - _commGroup.AddTask(taskId); - _groupCommTaskStarter.QueueTask(GetMapTaskConfiguration(activeContext, taskId), activeContext); - Interlocked.Increment(ref _numberOfReadyTasks); - Logger.Log(Level.Verbose, string.Format("{0} Tasks are ready for submission", _numberOfReadyTasks)); + configs = _serviceAndContextConfigurationProvider + .GetDataLoadingConfigurationForEvaluatorById( + allocatedEvaluator.Id); } + allocatedEvaluator.SubmitContextAndService(configs.Context, configs.Service); } + #endregion IAllocatedEvaluator + #region IActiveContext /// <summary> - /// Specifies what to do when the task is completed - /// In this case just disposes off the task + /// IActiveContext handler. It will take the following actions based on the system state: + /// Case WaitingForEvaluator: + /// Adds Active Context to Active Context Manager + /// Case Fail: + /// Closes the ActiveContext + /// Other cases - not expected /// </summary> - /// <param name="completedTask">The link to the completed task</param> - public void OnNext(ICompletedTask completedTask) + /// <param name="activeContext"></param> + public void OnNext(IActiveContext activeContext) { - lock (_completedTasks) + Logger.Log(Level.Info, "Received Active Context {0}, systemState {1}.", activeContext.Id, _systemState.CurrentState); + lock (_lock) { - Logger.Log(Level.Info, - string.Format("Received completed task message from task Id: {0}", completedTask.Id)); - _completedTasks.Add(completedTask); - - if (AreIMRUTasksCompleted()) + using (Logger.LogFunction("IMRUDriver::IActiveContext")) { - ShutDownAllEvaluators(); + switch (_systemState.CurrentState) + { + case SystemState.WaitingForEvaluator: + _contextManager.Add(activeContext); + break; + case SystemState.Fail: + Logger.Log(Level.Info, + "Received IActiveContext event, but system is in FAIL state. Closing the context."); + activeContext.Dispose(); + break; + default: + UnexpectedState(activeContext.Id, "IActiveContext"); + break; + } } } } + #endregion IActiveContext + #region submit tasks /// <summary> - /// Specifies what to do when evaluator fails. - /// If we get all completed tasks then ignore the failure - /// Else request a new evaluator. If failure happens in middle of IMRU - /// job we expect neighboring evaluators to fail while doing - /// communication and will use FailedTask and FailedContext logic to - /// order shutdown. + /// Called from ActiveContextManager when all the expected active context are received. + /// It changes the system state then calls SubmitTasks(). /// </summary> /// <param name="value"></param> - public void OnNext(IFailedEvaluator value) + public void OnNext(IEnumerable<IActiveContext> value) { - if (AreIMRUTasksCompleted()) + Logger.Log(Level.Info, "Received event from ActiveContextManager with NumberOfActiveContexts:" + (value != null ? value.Count() : 0)); + lock (_lock) { - Logger.Log(Level.Info, - string.Format("Evaluator with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id)); - return; + // When the event AllContextsAreReady happens, change the system state from WaitingForEvaluator to SubmittingTasks + _systemState.MoveNext(SystemStateEvent.AllContextsAreReady); + SubmitTasks(value); } + } - Logger.Log(Level.Info, - string.Format("Evaluator with Id: {0} failed with Exception: {1}", value.Id, value.EvaluatorException)); - int currFailedEvaluators = Interlocked.Increment(ref _currentFailedEvaluators); - if (currFailedEvaluators > _allowedFailedEvaluators) + /// <summary> + /// This method is responsible to prepare for the task submission then call SubmitTasks in TaskManager. + /// It is called in both first time and recovery scenarios. + /// Creates a new Communication Group and adds Group Communication Operators + /// For each context, adds a task to the communication group. + /// After all the tasks are added to the group, for each task, gets GroupCommTaskConfiguration from IGroupCommDriver + /// and merges it with the task configuration. + /// When all the tasks are added, calls TaskManager to SubmitTasks(). + /// </summary> + private void SubmitTasks(IEnumerable<IActiveContext> activeContexts) + { + Logger.Log(Level.Info, "SubmitTasks with system state : {0} at time: {1}.", _systemState.CurrentState, DateTime.Now); + using (Logger.LogFunction("IMRUDriver::SubmitTasksConfiguration")) { - Exceptions.Throw(new MaximumNumberOfEvaluatorFailuresExceededException(_allowedFailedEvaluators), - Logger); - } + if (!_isFirstTry) + { + _groupCommDriver.RemoveCommunicationGroup(IMRUConstants.CommunicationGroupName); + } - _serviceAndContextConfigurationProvider.RecordEvaluatorFailureById(value.Id); - bool isMaster = _serviceAndContextConfigurationProvider.IsMasterEvaluatorId(value.Id); + UpdateMaterTaskId(); + _taskManager = new TaskManager(_totalMappers + 1, _groupCommDriver.MasterTaskId); + var commGroup = AddCommunicationGroupWithOperators(); + _perMapperConfigurationStack = ConstructPerMapperConfigStack(_totalMappers); + + var taskIdAndContextMapping = new Dictionary<string, IActiveContext>(); + foreach (var activeContext in activeContexts) + { + var taskId = _evaluatorManager.IsMasterEvaluatorId(activeContext.EvaluatorId) + ? _groupCommDriver.MasterTaskId + : GetMapperTaskIdByEvaluatorId(activeContext.EvaluatorId); + commGroup.AddTask(taskId); + taskIdAndContextMapping.Add(taskId, activeContext); + } + + foreach (var mapping in taskIdAndContextMapping) + { + var taskConfig = _evaluatorManager.IsMasterEvaluatorId(mapping.Value.EvaluatorId) + ? GetMasterTaskConfiguration(mapping.Key) + : GetMapperTaskConfiguration(mapping.Value, mapping.Key); + var groupCommTaskConfiguration = _groupCommDriver.GetGroupCommTaskConfiguration(mapping.Key); + var mergedTaskConf = Configurations.Merge(taskConfig, groupCommTaskConfiguration); + _taskManager.AddTask(mapping.Key, mergedTaskConf, mapping.Value); + } + } + _taskManager.SubmitTasks(); + } - // If failed evaluator is master then ask for master - // evaluator else ask for mapper evaluator - if (!isMaster) + private void UpdateMaterTaskId() + { + if (_isFirstTry) { - Logger.Log(Level.Info, string.Format("Requesting a replacement map Evaluator for {0}", value.Id)); - RequestMapEvaluators(1); + _groupCommDriver.MasterTaskId = _groupCommDriver.MasterTaskId + "-" + _numberOfRetries; } else { - Logger.Log(Level.Info, string.Format("Requesting a replacement master Evaluator for {0}", value.Id)); - RequestUpdateEvaluator(); + _groupCommDriver.MasterTaskId = + _groupCommDriver.MasterTaskId.Substring(0, _groupCommDriver.MasterTaskId.Length - 1) + + _numberOfRetries; } } + #endregion submit tasks + #region IRunningTask /// <summary> - /// Specifies what to do if Failed Context is received. - /// An exception is thrown if tasks are not completed. + /// IRunningTask handler. The method is called when a task is running. The following action will be taken based on the system state: + /// Case SubmittingTasks + /// Add it to RunningTasks and set task state to TaskRunning + /// When all the tasks are running, change system state to TasksRunning + /// Case ShuttingDown/Fail + /// Call TaskManager to record RunningTask during SystemFailure + /// Other cases - not expected /// </summary> - /// <param name="value"></param> - public void OnNext(IFailedContext value) + /// <param name="runningTask"></param> + public void OnNext(IRunningTask runningTask) { - if (AreIMRUTasksCompleted()) + Logger.Log(Level.Info, "Received IRunningTask {0} from endpoint {1} at SystemState {2} retry # {3}.", runningTask.Id, GetEndPointFromTaskId(runningTask.Id), _systemState.CurrentState, _numberOfRetries); + lock (_lock) { - Logger.Log(Level.Info, - string.Format("Context with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id)); - return; + using (Logger.LogFunction("IMRUDriver::IRunningTask")) + { + switch (_systemState.CurrentState) + { + case SystemState.SubmittingTasks: + _taskManager.RecordRunningTask(runningTask); + if (_taskManager.AreAllTasksRunning()) + { + _systemState.MoveNext(SystemStateEvent.AllTasksAreRunning); + Logger.Log(Level.Info, + "All tasks are running, SystemState {0}", + _systemState.CurrentState); + } + break; + case SystemState.ShuttingDown: + case SystemState.Fail: + _taskManager.RecordRunningTaskDuringSystemFailure(runningTask, TaskManager.CloseTaskByDriver); + break; + default: + UnexpectedState(runningTask.Id, "IRunningTask"); + break; + } + } } - Exceptions.Throw(new Exception(string.Format("Data Loading Context with Id: {0} failed", value.Id)), Logger); } + #endregion IRunningTask + #region ICompletedTask /// <summary> - /// Specifies what to do if a task fails. - /// We throw the exception and fail IMRU unless IMRU job is already done. + /// ICompletedTask handler. It is called when a task is completed. The following action will be taken based on the System State: + /// Case TasksRunning + /// Updates task state to TaskCompleted + /// If all tasks are completed, sets system state to TasksCompleted and then go to Done action + /// Case ShuttingDown + /// Updates task state to TaskCompleted + /// Try to recover + /// Other cases - not expected /// </summary> - /// <param name="value"></param> - public void OnNext(IFailedTask value) + /// <param name="completedTask">The link to the completed task</param> + public void OnNext(ICompletedTask completedTask) { - if (AreIMRUTasksCompleted()) + Logger.Log(Level.Info, "Received ICompletedTask {0}, with systemState {1} in retry# {2}.", completedTask.Id, _systemState.CurrentState, _numberOfRetries); + lock (_lock) { - Logger.Log(Level.Info, - string.Format("Task with Id: {0} failed but IMRU task is completed. So ignoring.", value.Id)); - return; + switch (_systemState.CurrentState) + { + case SystemState.TasksRunning: + _taskManager.RecordCompletedTask(completedTask); + if (_taskManager.AreAllTasksCompleted()) + { + _systemState.MoveNext(SystemStateEvent.AllTasksAreCompleted); + Logger.Log(Level.Info, "All tasks are completed, systemState {0}", _systemState.CurrentState); + DoneAction(); + } + break; + case SystemState.ShuttingDown: + // The task might be in running state or waiting for close, record the completed task + _taskManager.RecordCompletedTask(completedTask); + TryRecovery(); + break; + default: + UnexpectedState(completedTask.Id, "ICompletedTask"); + break; + } + } + } + #endregion ICompletedTask + + #region IFailedEvaluator + /// <summary> + /// IFailedEvaluator handler. It specifies what to do when an evaluator fails. + /// If we get all completed tasks then ignore the failure. Otherwise, take the following actions based on the system state: + /// Case WaitingForEvaluator + /// This happens in the middle of submitting contexts. We just need to remove the failed evaluator + /// from EvaluatorManager and remove associated active context, if any, from ActiveContextManager + /// then checks if the system is recoverable. If yes, request another Evaluator + /// If not recoverable, set system state to Fail then execute Fail action + /// Case SubmittingTasks/TasksRunning + /// This happens either in the middle of Task submitting or all the tasks are running + /// Changes the system state to ShuttingDown + /// Removes Evaluator and associated context from EvaluatorManager and ActiveContextManager + /// Removes associated task from running task if it was running and change the task state to TaskFailedByEvaluatorFailure + /// Closes all the other running tasks + /// Try to recover in case it is the last failure received + /// Case ShuttingDown + /// This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing. + /// Removes Evaluator and associated context from EvaluatorManager and ActiveContextManager + /// Removes associated task from running task if it was running, changes the task state to ClosedTask if it was waiting for close + /// otherwise changes the task state to FailedTaskEvaluatorError + /// Try to recover in case it is the last failure received + /// Other cases - not expected + /// </summary> + /// <param name="failedEvaluator"></param> + public void OnNext(IFailedEvaluator failedEvaluator) + { + var endpoint = failedEvaluator.FailedTask.IsPresent() + ? GetEndPoint(failedEvaluator.FailedTask.Value) + : failedEvaluator.FailedContexts.Any() + ? GetEndPointFromContext(failedEvaluator.FailedContexts.First()) + : "unknown_endpoint"; + + Logger.Log(Level.Warning, "Received IFailedEvaluator {0} from endpoint {1} with systemState {2} in retry# {3} with Exception: {4}.", failedEvaluator.Id, endpoint, _systemState.CurrentState, _numberOfRetries, failedEvaluator.EvaluatorException); + + lock (_lock) + { + using (Logger.LogFunction("IMRUDriver::IFailedEvaluator")) + { + if (_taskManager != null && _taskManager.AreAllTasksCompleted()) + { + Logger.Log(Level.Verbose, + "All IMRU tasks have been completed. So ignoring the Evaluator {0} failure.", + failedEvaluator.Id); + return; + } + + var isMaster = _evaluatorManager.IsMasterEvaluatorId(failedEvaluator.Id); + _evaluatorManager.RecordFailedEvaluator(failedEvaluator.Id); + _contextManager.RemoveFailedContextInFailedEvaluator(failedEvaluator); + + switch (_systemState.CurrentState) + { + case SystemState.WaitingForEvaluator: + if (!_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures()) + { + if (isMaster) + { + Logger.Log(Level.Info, "Requesting a master Evaluator."); + _evaluatorManager.RemoveFailedEvaluator(failedEvaluator.Id); + _evaluatorManager.RequestUpdateEvaluator(); + } + else + { + _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider( + failedEvaluator.Id); + Logger.Log(Level.Info, "Requesting mapper Evaluators."); + _evaluatorManager.RemoveFailedEvaluator(failedEvaluator.Id); + _evaluatorManager.RequestMapEvaluators(1); + } + } + else + { + Logger.Log(Level.Error, "The system is not recoverable, change the state to Fail."); + _systemState.MoveNext(SystemStateEvent.NotRecoverable); + FailAction(); + } + break; + + case SystemState.SubmittingTasks: + case SystemState.TasksRunning: + // When the event FailedNode happens, change the system state to ShuttingDown + _systemState.MoveNext(SystemStateEvent.FailedNode); + _taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator); + _taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); + + // Push evaluator id back to PartitionIdProvider if it is not master + if (!isMaster) + { + _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider( + failedEvaluator.Id); + } + + TryRecovery(); + break; + + case SystemState.ShuttingDown: + _taskManager.RecordTaskFailWhenReceivingFailedEvaluator(failedEvaluator); + + // Push evaluator id back to PartitionIdProvider if it is not master + if (!isMaster) + { + _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider( + failedEvaluator.Id); + } + TryRecovery(); + break; + + case SystemState.Fail: + break; + + default: + UnexpectedState(failedEvaluator.Id, "IFailedEvaluator"); + break; + } + } + } + } + #endregion IFailedEvaluator + + #region IFailedContext + /// <summary> + /// IFailedContext handler. It specifies what to do if Failed Context is received. + /// If we get all completed tasks then ignore the failure otherwise throw exception + /// Fault tolerant would be similar to FailedEvaluator. + /// </summary> + /// <param name="failedContext"></param> + public void OnNext(IFailedContext failedContext) + { + lock (_lock) + { + if (_taskManager.AreAllTasksCompleted()) + { + Logger.Log(Level.Info, "Context with Id: {0} failed but IMRU tasks are completed. So ignoring.", failedContext.Id); + return; + } + + var msg = string.Format("Context with Id: {0} failed with Evaluator id: {1}", failedContext.Id, failedContext.EvaluatorId); + Exceptions.Throw(new Exception(msg), Logger); + } + } + #endregion IFailedContext + + #region IFailedTask + /// <summary> + /// IFailedTask handler. It specifies what to do when task fails. + /// If we get all completed tasks then ignore the failure. Otherwise take the following actions based on the System state: + /// Case SubmittingTasks/TasksRunning + /// This is the first failure received + /// Changes the system state to ShuttingDown + /// Record failed task in TaskManager + /// Closes all the other running tasks and set their state to TaskWaitingForClose + /// Try to recover + /// Case ShuttingDown + /// This happens when we have received either FailedEvaluator or FailedTask, some tasks are running some are in closing. + /// Record failed task in TaskManager. + /// Try to recover + /// Other cases - not expected + /// </summary> + /// <param name="failedTask"></param> + public void OnNext(IFailedTask failedTask) + { + Logger.Log(Level.Warning, "Received IFailedTask with Id: {0} and message: {1} from endpoint {2} with systemState {3} in retry#: {4}.", failedTask.Id, failedTask.Message, GetEndPointFromContext(failedTask.GetActiveContext()), _systemState.CurrentState, _numberOfRetries); + lock (_lock) + { + using (Logger.LogFunction("IMRUDriver::IFailedTask")) + { + if (_taskManager.AreAllTasksCompleted()) + { + Logger.Log(Level.Info, + "Task with Id: {0} failed but all IMRU tasks are completed. So ignoring.", + failedTask.Id); + return; + } + + switch (_systemState.CurrentState) + { + case SystemState.SubmittingTasks: + case SystemState.TasksRunning: + // When the event FailedNode happens, change the system state to ShuttingDown + _systemState.MoveNext(SystemStateEvent.FailedNode); + _taskManager.RecordFailedTaskDuringRunningOrSubmissionState(failedTask); + _taskManager.CloseAllRunningTasks(TaskManager.CloseTaskByDriver); + TryRecovery(); + break; + + case SystemState.ShuttingDown: + _taskManager.RecordFailedTaskDuringSystemShuttingDownState(failedTask); + TryRecovery(); + break; + + default: + UnexpectedState(failedTask.Id, "IFailedTask"); + break; + } + } } - Exceptions.Throw(new Exception(string.Format("Task with Id: {0} failed", value.Id)), Logger); } + #endregion IFailedTask public void OnError(Exception error) { @@ -282,42 +653,154 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver { } - private bool AreIMRUTasksCompleted() + private void UnexpectedState(string id, string eventName) { - return _completedTasks.Count >= _totalMappers + 1; + var msg = string.Format(CultureInfo.InvariantCulture, + "Received {0} for [{1}], but system status is {2}.", + eventName, + id, + _systemState.CurrentState); + Exceptions.Throw(new IMRUSystemException(msg), Logger); } - private string GetTaskIdByEvaluatorId(string evaluatorId) + /// <summary> + /// If all the tasks are in final state, if the system is recoverable, start recovery + /// else, change the system state to Fail then take Fail action + /// </summary> + private void TryRecovery() { - return string.Format("{0}-{1}-Version0", + if (_taskManager.AreAllTasksInFinalState()) + { + if (IsRecoverable()) + { + _isFirstTry = false; + RecoveryAction(); + } + else + { + Logger.Log(Level.Warning, "The system is not recoverable, change the state to Fail."); + _systemState.MoveNext(SystemStateEvent.NotRecoverable); + FailAction(); + } + } + } + + private string GetMapperTaskIdByEvaluatorId(string evaluatorId) + { + return string.Format("{0}-{1}-{2}", IMRUConstants.MapTaskPrefix, - _serviceAndContextConfigurationProvider.GetPartitionIdByEvaluatorId(evaluatorId)); + _serviceAndContextConfigurationProvider.GetPartitionIdByEvaluatorId(evaluatorId), + _numberOfRetries); + } + + /// <summary> + /// This method is called when all the tasks are successfully completed. + /// </summary> + private void DoneAction() + { + ShutDownAllEvaluators(); + Logger.Log(Level.Info, "DoneAction done in retry {0}!!!", _numberOfRetries); + } + + /// <summary> + /// This method is called when there are failures and the system is not recoverable. + /// </summary> + private void FailAction() + { + ShutDownAllEvaluators(); + var msg = string.Format(CultureInfo.InvariantCulture, + "The system cannot be recovered after {0} retries. NumberofFailedMappers in the last try is {1}.", + _numberOfRetries, _evaluatorManager.NumberofFailedMappers()); + Exceptions.Throw(new ApplicationException(msg), Logger); } /// <summary> - /// Shuts down evaluators once all completed task messages are received + /// Shuts down evaluators /// </summary> private void ShutDownAllEvaluators() { - foreach (var task in _completedTasks) + foreach (var context in _contextManager.ActiveContexts) { - Logger.Log(Level.Info, string.Format("Disposing task: {0}", task.Id)); - task.ActiveContext.Dispose(); + Logger.Log(Level.Verbose, "Disposing active context: {0}", context.Id); + context.Dispose(); + } + } + + /// <summary> + /// This method is called for recovery. It resets Failed Evaluators and changes state to WaitingForEvaluator + /// If there is no failed mappers, meaning the recovery is caused by failed tasks, resubmit all the tasks. + /// Else, based on the number of failed evaluators, requests missing map evaluators + /// </summary> + private void RecoveryAction() + { + lock (_lock) + { + _numberOfRetries++; + var msg = string.Format(CultureInfo.InvariantCulture, + "Start recovery with _numberOfRetryForFaultTolerant {0}, NumberofFailedMappers {1}.", + _numberOfRetries, + _evaluatorManager.NumberofFailedMappers()); + Logger.Log(Level.Info, msg); + + _systemState.MoveNext(SystemStateEvent.Recover); + + var mappersToRequest = _evaluatorManager.NumberofFailedMappers(); + _evaluatorManager.ResetFailedEvaluators(); + + if (mappersToRequest == 0) + { + Logger.Log(Level.Info, "There is no failed Evaluator in this recovery but failed tasks."); + if (_contextManager.AreAllContextsReceived) + { + OnNext(_contextManager.ActiveContexts); + } + else + { + Exceptions.Throw(new IMRUSystemException("In recovery, there are no Failed evaluators but not all the contexts are received"), Logger); + } + } + else + { + Logger.Log(Level.Info, "Requesting {0} map Evaluators.", mappersToRequest); + _evaluatorManager.RequestMapEvaluators(mappersToRequest); + } } } /// <summary> - /// Generates map task configuration given the active context. + /// Checks if the system is recoverable. + /// </summary> + /// <returns></returns> + private bool IsRecoverable() + { + var msg = string.Format(CultureInfo.InvariantCulture, + "IsRecoverable: _numberOfRetryForFaultTolerant {0}, NumberofFailedMappers {1}, NumberOfAppErrors {2}, IsMasterEvaluatorFailed {3} AllowedNumberOfEvaluatorFailures {4}, _maxRetryNumberForFaultTolerant {5}.", + _numberOfRetries, + _evaluatorManager.NumberofFailedMappers(), + _taskManager.NumberOfAppErrors(), + _evaluatorManager.IsMasterEvaluatorFailed(), + _evaluatorManager.AllowedNumberOfEvaluatorFailures, + _maxRetryNumberForFaultTolerant); + Logger.Log(Level.Info, msg); + + return !_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures() + && _taskManager.NumberOfAppErrors() == 0 + && !_evaluatorManager.IsMasterEvaluatorFailed() + && _numberOfRetries < _maxRetryNumberForFaultTolerant; + } + + /// <summary> + /// Generates map task configuration given the active context. S /// Merge configurations of all the inputs to the MapTaskHost. /// </summary> /// <param name="activeContext">Active context to which task needs to be submitted</param> /// <param name="taskId">Task Id</param> /// <returns>Map task configuration</returns> - private IConfiguration GetMapTaskConfiguration(IActiveContext activeContext, string taskId) + private IConfiguration GetMapperTaskConfiguration(IActiveContext activeContext, string taskId) { IConfiguration mapSpecificConfig; - if (!_perMapperConfiguration.TryPop(out mapSpecificConfig)) + if (!_perMapperConfigurationStack.TryPop(out mapSpecificConfig)) { Exceptions.Throw( new IMRUSystemException(string.Format("No per map configuration exist for the active context {0}", @@ -343,13 +826,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// Merge configurations of all the inputs to the UpdateTaskHost. /// </summary> /// <returns>Update task configuration</returns> - private IConfiguration GetUpdateTaskConfiguration() + private IConfiguration GetMasterTaskConfiguration(string taskId) { var partialTaskConf = TangFactory.GetTang() .NewConfigurationBuilder(TaskConfiguration.ConfigurationModule .Set(TaskConfiguration.Identifier, - IMRUConstants.UpdateTaskName) + taskId) .Set(TaskConfiguration.Task, GenericType<UpdateTaskHost<TMapInput, TMapOutput, TResult>>.Class) .Set(TaskConfiguration.OnClose, @@ -384,8 +867,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> - /// Generate the group communicaiton configuration to be added - /// to the tasks + /// Creates the group communication configuration to be added to the tasks /// </summary> /// <returns>The group communication configuration</returns> private IConfiguration GetGroupCommConfiguration() @@ -406,12 +888,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <summary> /// Adds broadcast and reduce operators to the default communication group /// </summary> - private void AddGroupCommunicationOperators() + private ICommunicationGroupDriver AddCommunicationGroupWithOperators() { var reduceFunctionConfig = _configurationManager.ReduceFunctionConfiguration; var mapOutputPipelineDataConverterConfig = _configurationManager.MapOutputPipelineDataConverterConfiguration; var mapInputPipelineDataConverterConfig = _configurationManager.MapInputPipelineDataConverterConfiguration; + // TODO check the specific exception type try { TangFactory.GetTang() @@ -452,25 +935,26 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver .Build(); } - _commGroup = - _groupCommDriver.DefaultGroup + var commGroup = + _groupCommDriver.NewCommunicationGroup(IMRUConstants.CommunicationGroupName, _totalMappers + 1) .AddBroadcast<MapInputWithControlMessage<TMapInput>>( IMRUConstants.BroadcastOperatorName, - IMRUConstants.UpdateTaskName, + _groupCommDriver.MasterTaskId, TopologyTypes.Tree, mapInputPipelineDataConverterConfig) .AddReduce<TMapOutput>( IMRUConstants.ReduceOperatorName, - IMRUConstants.UpdateTaskName, + _groupCommDriver.MasterTaskId, TopologyTypes.Tree, reduceFunctionConfig, mapOutputPipelineDataConverterConfig) .Build(); + + return commGroup; } /// <summary> - /// Construct the stack of map configuraion which - /// is specific to each mapper. If user does not + /// Construct the stack of map configuration which is specific to each mapper. If user does not /// specify any then its empty configuration /// </summary> /// <param name="totalMappers">Total mappers</param> @@ -490,30 +974,47 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> - /// Request map evaluators from resource manager + /// look up endpoint for given id /// </summary> - /// <param name="numEvaluators">Number of evaluators to request</param> - private void RequestMapEvaluators(int numEvaluators) + /// <param name="taskId">Registered identifier in name server></param> + /// <returns></returns> + private string GetEndPointFromTaskId(string taskId) { - _evaluatorRequestor.Submit( - _evaluatorRequestor.NewBuilder() - .SetMegabytes(_memoryPerMapper) - .SetNumber(numEvaluators) - .SetCores(_coresPerMapper) - .Build()); + List<string> t = new List<string>(); + t.Add(taskId); + var ips = _nameServer.Lookup(t); + if (ips.Count > 0) + { + var ip = ips.FirstOrDefault(); + if (ip != null) + { + return ip.Endpoint.ToString(); + } + } + return null; } - /// <summary> - /// Request update/master evaluator from resource manager - /// </summary> - private void RequestUpdateEvaluator() - { - _evaluatorRequestor.Submit( - _evaluatorRequestor.NewBuilder() - .SetCores(_coresForUpdateTask) - .SetMegabytes(_memoryForUpdateTask) - .SetNumber(1) - .Build()); + private string GetEndPoint(IFailedTask failedTask) + { + return GetEndPointFromTaskId(failedTask.Id) ?? GetEndPointFromContext(failedTask.GetActiveContext()); + } + + private string GetEndPointFromContext(IFailedContext context) + { + if (context == null || context.EvaluatorDescriptor == null || context.EvaluatorDescriptor.NodeDescriptor == null) + { + return null; + } + return context.EvaluatorDescriptor.NodeDescriptor.HostName; + } + + private string GetEndPointFromContext(Optional<IActiveContext> context) + { + if (!context.IsPresent() || context.Value == null || context.Value.EvaluatorDescriptor == null || context.Value.EvaluatorDescriptor.NodeDescriptor == null) + { + return null; + } + return context.Value.EvaluatorDescriptor.NodeDescriptor.HostName; } } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs index 36916db..24a2b9b 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/ServiceAndContextConfigurationProvider.cs @@ -17,36 +17,38 @@ using System; using System.Collections.Generic; +using System.Globalization; using Org.Apache.REEF.Common.Context; using Org.Apache.REEF.Common.Events; using Org.Apache.REEF.Common.Services; using Org.Apache.REEF.IO.PartitionedData; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Diagnostics; using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.IMRU.OnREEF.Driver { /// <summary> - /// Class that handles failed evaluators and also provides Service - /// and Context configuration + /// Class that provides Service and Context configuration /// </summary> /// <typeparam name="TMapInput"></typeparam> /// <typeparam name="TMapOutput"></typeparam> /// <typeparam name="TPartitionType"></typeparam> + [NotThreadSafe] internal sealed class ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType> { private static readonly Logger Logger = Logger.GetLogger(typeof(ServiceAndContextConfigurationProvider<TMapInput, TMapOutput, TPartitionType>)); private readonly Dictionary<string, string> _partitionIdProvider = new Dictionary<string, string>(); - private readonly ISet<string> _submittedEvaluators = new HashSet<string>(); - private readonly ISet<string> _contextLoadedEvaluators = new HashSet<string>(); - private readonly object _lock = new object(); private readonly Stack<string> _partitionDescriptorIds = new Stack<string>(); private readonly IPartitionedInputDataSet _dataset; - private string _masterEvaluatorId; + /// <summary> + /// Constructs the object witch maintains partitionDescriptor Ids so that to provide proper data load configuration + /// </summary> + /// <param name="dataset"></param> internal ServiceAndContextConfigurationProvider(IPartitionedInputDataSet dataset) { _dataset = dataset; @@ -57,120 +59,34 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> - /// Handles failed evaluator. Moves the id from - /// submitted evaluator to failed evaluator + /// Handles failed evaluator. Push the partitionId back to Partition Descriptor Id stack and + /// remove evaluatorId from Partition Id Provider collection /// </summary> /// <param name="evaluatorId"></param> /// <returns>Whether failed evaluator is master or not</returns> - internal bool RecordEvaluatorFailureById(string evaluatorId) + internal void RemoveEvaluatorIdFromPartitionIdProvider(string evaluatorId) { - lock (_lock) + if (!_partitionIdProvider.ContainsKey(evaluatorId)) { - string msg; - bool isMaster = IsMasterEvaluatorId(evaluatorId); - - if (_contextLoadedEvaluators.Contains(evaluatorId)) - { - msg = - string.Format( - "Failed evaluator:{0} already had context loaded. Cannot handle failure at this stage", - evaluatorId); - Exceptions.Throw(new Exception(msg), Logger); - } - - if (!_submittedEvaluators.Contains(evaluatorId)) - { - msg = string.Format("Failed evaluator:{0} was never submitted", evaluatorId); - Exceptions.Throw(new Exception(msg), Logger); - } - - if (!_partitionIdProvider.ContainsKey(evaluatorId) && !isMaster) - { - msg = string.Format("Partition descriptor for Failed evaluator:{0} not present", evaluatorId); - Exceptions.Throw(new Exception(msg), Logger); - } - - _submittedEvaluators.Remove(evaluatorId); - - if (isMaster) - { - Logger.Log(Level.Info, "Failed Evaluator is Master"); - _masterEvaluatorId = null; - return true; - } - - Logger.Log(Level.Info, "Failed Evaluator is a Mapper"); - _partitionDescriptorIds.Push(_partitionIdProvider[evaluatorId]); - _partitionIdProvider.Remove(evaluatorId); - return false; + var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for Failed evaluator:{0} not present", evaluatorId); + Exceptions.Throw(new Exception(msg), Logger); } + _partitionDescriptorIds.Push(_partitionIdProvider[evaluatorId]); + _partitionIdProvider.Remove(evaluatorId); } /// <summary> - /// Notifies that active context state has been reached + /// Gets Context and Service configuration for Master /// </summary> /// <param name="evaluatorId"></param> - internal void RecordActiveContextPerEvaluatorId(string evaluatorId) - { - lock (_lock) - { - if (!_submittedEvaluators.Contains(evaluatorId)) - { - var msg = string.Format("Evaluator:{0} never loaded data but still reached active context stage", - evaluatorId); - Exceptions.Throw(new Exception(msg), Logger); - } - - if (_contextLoadedEvaluators.Contains(evaluatorId)) - { - var msg = string.Format("Evaluator:{0} already reached the active context stage", evaluatorId); - Exceptions.Throw(new Exception(msg), Logger); - } - - _contextLoadedEvaluators.Add(evaluatorId); - _submittedEvaluators.Remove(evaluatorId); - } - } - - /// <summary> - /// Gets next context configuration. Either master or mapper. - /// </summary> - /// <param name="evaluatorId">Evaluator Id</param> - /// <returns>The context and service configuration</returns> - internal ContextAndServiceConfiguration GetContextConfigurationForEvaluatorById(string evaluatorId) - { - lock (_lock) - { - if (_submittedEvaluators.Contains(evaluatorId)) - { - string msg = string.Format("The context is already submitted to evaluator:{0}", evaluatorId); - Exceptions.Throw(new Exception(msg), Logger); - } - - if (_masterEvaluatorId == null) - { - Logger.Log(Level.Info, "Submitting root context and service for master"); - _masterEvaluatorId = evaluatorId; - _submittedEvaluators.Add(evaluatorId); - return new ContextAndServiceConfiguration( - ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier, - IMRUConstants.MasterContextId).Build(), - TangFactory.GetTang().NewConfigurationBuilder().Build()); - } - - Logger.Log(Level.Info, "Submitting root context and service for a map task"); - return GetDataLoadingConfigurationForEvaluatorById(evaluatorId); - } - } - - /// <summary> - /// Checks whether evaluator id is that of master - /// </summary> - /// <param name="evaluatorId">Id of evaluator</param> - /// <returns>true if id is that of master</returns> - internal bool IsMasterEvaluatorId(string evaluatorId) + /// <returns></returns> + internal ContextAndServiceConfiguration GetContextConfigurationForMasterEvaluatorById(string evaluatorId) { - return evaluatorId.Equals(_masterEvaluatorId); + Logger.Log(Level.Info, "Getting root context and service configuration for master"); + return new ContextAndServiceConfiguration( + ContextConfiguration.ConfigurationModule.Set(ContextConfiguration.Identifier, + IMRUConstants.MasterContextId).Build(), + TangFactory.GetTang().NewConfigurationBuilder().Build()); } /// <summary> @@ -180,29 +96,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <returns></returns> internal string GetPartitionIdByEvaluatorId(string evaluatorId) { - lock (_lock) + if (!_partitionIdProvider.ContainsKey(evaluatorId)) { - string msg; - if (!_submittedEvaluators.Contains(evaluatorId) && !_contextLoadedEvaluators.Contains(evaluatorId)) - { - msg = string.Format("Context for Evaluator:{0} has never been submitted", evaluatorId); - Exceptions.Throw(new IMRUSystemException(msg), Logger); - } - - if (IsMasterEvaluatorId(evaluatorId)) - { - msg = string.Format("Evaluator:{0} is master and does not get partition", evaluatorId); - Exceptions.Throw(new IMRUSystemException(msg), Logger); - } - - if (!_partitionIdProvider.ContainsKey(evaluatorId)) - { - msg = string.Format("Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId); - Exceptions.Throw(new IMRUSystemException(msg), Logger); - } - - return _partitionIdProvider[evaluatorId]; + var msg = string.Format(CultureInfo.InvariantCulture, "Partition descriptor for evaluator:{0} is not present in the mapping", evaluatorId); + Exceptions.Throw(new IMRUSystemException(msg), Logger); } + + return _partitionIdProvider[evaluatorId]; } /// <summary> @@ -211,16 +111,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// </summary> /// <param name="evaluatorId"></param> /// <returns></returns> - private ContextAndServiceConfiguration GetDataLoadingConfigurationForEvaluatorById(string evaluatorId) + internal ContextAndServiceConfiguration GetDataLoadingConfigurationForEvaluatorById(string evaluatorId) { - string msg; - - if (_contextLoadedEvaluators.Contains(evaluatorId)) - { - msg = string.Format("Evaluator:{0} already has the data loaded", evaluatorId); - Exceptions.Throw(new IMRUSystemException(msg), Logger); - } - if (_partitionDescriptorIds.Count == 0) { Exceptions.Throw(new IMRUSystemException("No more data configuration can be provided"), Logger); @@ -228,8 +120,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver if (_partitionIdProvider.ContainsKey(evaluatorId)) { - msg = + var msg = string.Format( + CultureInfo.InvariantCulture, "Evaluator Id:{0} already present in configuration cache, they have to be unique", evaluatorId); Exceptions.Throw(new IMRUSystemException(msg), Logger); @@ -237,14 +130,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver Logger.Log(Level.Info, "Getting a new data loading configuration"); _partitionIdProvider[evaluatorId] = _partitionDescriptorIds.Pop(); - _submittedEvaluators.Add(evaluatorId); - - msg = string.Format( - "Current status: Submitted Evaluators-{0}, Data Loaded Evaluators-{1}, Unused data partitions-{2}", - _submittedEvaluators.Count, - _contextLoadedEvaluators.Count, - _partitionDescriptorIds.Count); - Logger.Log(Level.Info, msg); try { @@ -254,14 +139,20 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } catch (Exception e) { - msg = string.Format("Error while trying to access partition descriptor:{0} from dataset", + var msg = string.Format(CultureInfo.InvariantCulture, "Error while trying to access partition descriptor:{0} from dataset", _partitionIdProvider[evaluatorId]); Exceptions.Throw(e, msg, Logger); return null; } } - private ContextAndServiceConfiguration GetDataLoadingContextAndServiceConfiguration( + /// <summary> + /// Creates service and data loading context configuration for given evaluator id + /// </summary> + /// <param name="partitionDescriptor"></param> + /// <param name="evaluatorId"></param> + /// <returns></returns> + private static ContextAndServiceConfiguration GetDataLoadingContextAndServiceConfiguration( IPartitionDescriptor partitionDescriptor, string evaluatorId) { @@ -286,4 +177,4 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver return new ContextAndServiceConfiguration(contextConf, serviceConf); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs index 3bf6d75..a37fa3b 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/TaskManager.cs @@ -1,4 +1,4 @@ -// Licensed to the Apache Software Foundation (ASF) under one +// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file @@ -15,15 +15,18 @@ // specific language governing permissions and limitations // under the License. +using System; using System.Collections.Generic; using System.Globalization; using System.Linq; using System.Text; +using Org.Apache.REEF.Common.Runtime.Evaluator.Task; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; using Org.Apache.REEF.Driver.Task; using Org.Apache.REEF.IMRU.OnREEF.Driver.StateMachine; using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; +using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Diagnostics; @@ -202,7 +205,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <summary> /// This method is called when receiving ICompletedTask event during task running or system shutting down. /// Removes the task from running tasks if it was running - /// Changes the task state from RunningTask to CompletedTask + /// Changes the task state from RunningTask to CompletedTask if the task was running + /// Change the task stat from TaskWaitingForClose to TaskClosedByDriver if the task was in TaskWaitingForClose state /// </summary> /// <param name="completedTask"></param> internal void RecordCompletedTask(ICompletedTask completedTask) @@ -233,6 +237,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <param name="failedTask"></param> internal void RecordFailedTaskDuringSystemShuttingDownState(IFailedTask failedTask) { + Logger.Log(Level.Info, "RecordFailedTaskDuringSystemShuttingDownState, exceptionType: {0}", GetTaskErrorEventByExceptionType(failedTask).ToString()); + var taskState = GetTaskState(failedTask.Id); if (taskState == StateMachine.TaskState.TaskWaitingForClose) { @@ -260,7 +266,9 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver { if (!_runningTasks.ContainsKey(taskId)) { - var msg = string.Format(CultureInfo.InvariantCulture, "The task [{0}] doesn't exist in Running Tasks.", taskId); + var msg = string.Format(CultureInfo.InvariantCulture, + "The task [{0}] doesn't exist in Running Tasks.", + taskId); Exceptions.Throw(new IMRUSystemException(msg), Logger); } _runningTasks.Remove(taskId); @@ -268,6 +276,20 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError); } + else + { + var taskId = FindTaskAssociatedWithTheEvalutor(failedEvaluator.Id); + var taskState = GetTaskState(taskId); + if (taskState == StateMachine.TaskState.TaskSubmitted) + { + UpdateState(taskId, TaskStateEvent.FailedTaskEvaluatorError); + } + } + } + + private string FindTaskAssociatedWithTheEvalutor(string evaluatorId) + { + return _tasks.Where(e => e.Value.ActiveContext.EvaluatorId.Equals(evaluatorId)).Select(e => e.Key).FirstOrDefault(); } /// <summary> @@ -346,13 +368,37 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> - /// Gets error type based on the exception type in IFailedTask + /// Gets error type (encoded as TaskStateEvent) based on the exception type in IFailedTask. + /// For unknown exceptions or exceptions that doesn't belong to defined IMRU task exceptions + /// treat then as application error. /// </summary> /// <param name="failedTask"></param> /// <returns></returns> private TaskStateEvent GetTaskErrorEventByExceptionType(IFailedTask failedTask) { var exception = failedTask.AsError(); + var innerExceptionType = exception.InnerException != null ? exception.InnerException.GetType().ToString() : "InnerException null"; + var innerExceptionMsg = exception.InnerException != null ? exception.InnerException.Message : "No InnerException"; + + + if (failedTask.GetActiveContext().IsPresent()) + { + Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType: with task id: {0}, exception type {1}, innerException type {2}, InnerExceptionMessage {3}, evaluator id: {4}", + failedTask.Id, + exception.GetType(), + innerExceptionType, + innerExceptionMsg, + failedTask.GetActiveContext().Value.EvaluatorId); + } + else + { + Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType: with task id: {0}, exception type {1}, innerException type {2}, InnerExceptionMessage {3}", + failedTask.Id, + exception.GetType(), + innerExceptionType, + innerExceptionMsg); + } + if (exception is IMRUTaskAppException) { _numberOfAppErrors++; @@ -362,10 +408,26 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver { return TaskStateEvent.FailedTaskCommunicationError; } - else + if (exception is IMRUTaskSystemException) { return TaskStateEvent.FailedTaskSystemError; } + + // special case for communication error during group communication initialization + if (exception is TaskClientCodeException) + { + // try extract cause and check whether it is InjectionException for GroupCommClient + if (exception.InnerException != null && + exception.InnerException is InjectionException && + exception.InnerException.Message.Contains("GroupCommClient")) + { + Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType:FailedTaskCommunicationError with task id {0}", failedTask.Id); + return TaskStateEvent.FailedTaskCommunicationError; + } + } + + Logger.Log(Level.Info, "GetTaskErrorEventByExceptionType for un-hanlded exception with task id {0} and exception type {1}", failedTask.Id, exception.GetType()); + return TaskStateEvent.FailedTaskSystemError; } /// <summary> @@ -381,9 +443,38 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// Checks if all the tasks are in final states /// </summary> /// <returns></returns> - internal bool AllInFinalState() + internal bool AreAllTasksInFinalState() { - return _tasks.All(t => t.Value.TaskState.IsFinalState()); + var notInFinalState = _tasks.Where(t => !t.Value.TaskState.IsFinalState()).Take(5).ToList(); + var count = _tasks.Where(t => !t.Value.TaskState.IsFinalState()).Count(); + + if (notInFinalState.Any()) + { + Logger.Log(Level.Info, "Total tasks that are not in final state: {0}, and first 5 are:\r\n {1}", count, string.Join("\r\n", notInFinalState.Select(ToLog))); + } + else + { + Logger.Log(Level.Info, "All the tasks are in final state"); + } + + return !notInFinalState.Any(); + } + + private string ToLog(KeyValuePair<string, TaskInfo> t) + { + try + { + return string.Format("State={0}, taskId={1}, ContextId={2}, evaluatorId={3}, evaluatorHost={4}", + t.Value.TaskState.CurrentState, + t.Key, + t.Value.ActiveContext.Id, + t.Value.ActiveContext.EvaluatorId, + t.Value.ActiveContext.EvaluatorDescriptor.NodeDescriptor.HostName); + } + catch (Exception ex) + { + return string.Format("Failed to get task string: {0}", ex); + } } /// <summary> @@ -415,18 +506,37 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// </summary> internal void SubmitTasks() { - if (NumberOfTasks < _totalExpectedTasks || !MasterTaskExists()) + using (Logger.LogFunction("TaskManager::SubmitTasks")) { - string msg = string.Format("Trying to submit tasks but either master task doesn't exist or number of tasks [{0}] is smaller than expected number of tasks [{1}].", NumberOfTasks, _totalExpectedTasks); - Exceptions.Throw(new IMRUSystemException(msg), Logger); - } + if (NumberOfTasks < _totalExpectedTasks || !MasterTaskExists()) + { + string msg = + string.Format( + "Trying to submit tasks but either master task doesn't exist or number of tasks [{0}] is smaller than expected number of tasks [{1}].", + NumberOfTasks, + _totalExpectedTasks); + Exceptions.Throw(new IMRUSystemException(msg), Logger); + } - foreach (var taskId in _tasks.Keys) - { - var taskInfo = GetTaskInfo(taskId); - taskInfo.ActiveContext.SubmitTask(taskInfo.TaskConfiguration); - UpdateState(taskId, TaskStateEvent.SubmittedTask); - } + SubmitTask(_masterTaskId); + + foreach (var taskId in _tasks.Keys) + { + if (taskId.Equals(_masterTaskId)) + { + continue; + } + SubmitTask(taskId); + } + } + } + + private void SubmitTask(string taskId) + { + Logger.Log(Level.Info, "SubmitTask with task id: {0}.", taskId); + var taskInfo = GetTaskInfo(taskId); + taskInfo.ActiveContext.SubmitTask(taskInfo.TaskConfiguration); + UpdateState(taskId, TaskStateEvent.SubmittedTask); } /// <summary> http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs index bce1e4d..ca2fb85 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/MapTaskHost.cs @@ -17,7 +17,8 @@ using System; using System.IO; -using System.Text; +using System.Net.Sockets; +using System.Runtime.Remoting; using System.Threading; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Common.Tasks.Events; @@ -75,13 +76,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// <param name="groupCommunicationsClient">Used to setup the communications.</param> /// <param name="taskCloseCoordinator">Task close Coordinator</param> /// <param name="invokeGC">Whether to call Garbage Collector after each iteration or not</param> + /// <param name="taskId">task id</param> [Inject] private MapTaskHost( IMapFunction<TMapInput, TMapOutput> mapTask, IGroupCommClient groupCommunicationsClient, TaskCloseCoordinator taskCloseCoordinator, - [Parameter(typeof(InvokeGC))] bool invokeGC) + [Parameter(typeof(InvokeGC))] bool invokeGC, + [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId) { + Logger.Log(Level.Info, "Entering constructor of MapTaskHost for task id {0}", taskId); _mapTask = mapTask; _groupCommunicationsClient = groupCommunicationsClient; var cg = groupCommunicationsClient.GetCommunicationGroup(IMRUConstants.CommunicationGroupName); @@ -91,6 +95,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks _invokeGC = invokeGC; _taskCloseCoordinator = taskCloseCoordinator; _cancellationSource = new CancellationTokenSource(); + Logger.Log(Level.Info, "MapTaskHost initialized."); } /// <summary> @@ -100,21 +105,22 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// <returns></returns> public byte[] Call(byte[] memento) { + Logger.Log(Level.Info, "Entering MapTaskHost Call()."); MapControlMessage controlMessage = MapControlMessage.AnotherRound; - - while (!_cancellationSource.IsCancellationRequested && controlMessage != MapControlMessage.Stop) + try { - if (_invokeGC) + while (!_cancellationSource.IsCancellationRequested && controlMessage != MapControlMessage.Stop) { - Logger.Log(Level.Verbose, "Calling Garbage Collector"); - GC.Collect(); - GC.WaitForPendingFinalizers(); - } + if (_invokeGC) + { + Logger.Log(Level.Verbose, "Calling Garbage Collector"); + GC.Collect(); + GC.WaitForPendingFinalizers(); + } - try - { using ( - MapInputWithControlMessage<TMapInput> mapInput = _dataAndMessageReceiver.Receive(_cancellationSource)) + MapInputWithControlMessage<TMapInput> mapInput = + _dataAndMessageReceiver.Receive(_cancellationSource)) { controlMessage = mapInput.ControlMessage; if (controlMessage != MapControlMessage.Stop) @@ -123,32 +129,77 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks } } } - catch (OperationCanceledException e) - { - Logger.Log(Level.Warning, "Received OperationCanceledException in MapTaskHost with message: {0}.", e.Message); - break; - } - catch (IOException e) + } + catch (OperationCanceledException e) + { + Logger.Log(Level.Warning, + "Received OperationCanceledException in MapTaskHost with message: {0}. The cancellation token is: {1}.", + e.Message, + _cancellationSource.IsCancellationRequested); + } + catch (Exception e) + { + if (e is IOException || e is TcpClientConnectionException || e is RemotingException || + e is SocketException) { - Logger.Log(Level.Error, "Received IOException in MapTaskHost with message: {0}.", e.Message); + Logger.Log(Level.Error, + "Received Exception {0} in MapTaskHost with message: {1}. The cancellation token is: {2}.", + e.GetType(), + e.Message, + _cancellationSource.IsCancellationRequested); if (!_cancellationSource.IsCancellationRequested) { + Logger.Log(Level.Error, + "MapTask is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.", + _cancellationSource.IsCancellationRequested); throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError); } - break; } - catch (TcpClientConnectionException e) + else if (e is AggregateException) + { + Logger.Log(Level.Error, + "Received AggregateException. The cancellation token is: {0}.", + _cancellationSource.IsCancellationRequested); + if (e.InnerException != null) + { + Logger.Log(Level.Error, + "InnerException {0}, with message {1}.", + e.InnerException.GetType(), + e.InnerException.Message); + } + if (!_cancellationSource.IsCancellationRequested) + { + if (e.InnerException != null && e.InnerException is IOException) + { + Logger.Log(Level.Error, + "MapTask is throwing IMRUTaskGroupCommunicationException with cancellation token: {0}.", + _cancellationSource.IsCancellationRequested); + throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError); + } + else + { + throw e; + } + } + } + else { - Logger.Log(Level.Error, "Received TcpClientConnectionException in MapTaskHost with message: {0}.", e.Message); + Logger.Log(Level.Error, + "MapTask is throwing Exception {0}, message {1} with cancellation token: {2} and StackTrace {3}.", + e.GetType(), + e.Message, + _cancellationSource.IsCancellationRequested, + e.StackTrace); if (!_cancellationSource.IsCancellationRequested) { - throw new IMRUTaskGroupCommunicationException(TaskManager.TaskGroupCommunicationError); + throw e; } - break; } } - - _taskCloseCoordinator.SignalTaskStopped(); + finally + { + _taskCloseCoordinator.SignalTaskStopped(); + } Logger.Log(Level.Info, "MapTaskHost returned with cancellation token:{0}.", _cancellationSource.IsCancellationRequested); return null; } http://git-wip-us.apache.org/repos/asf/reef/blob/b14c8cd8/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs index a9014c3..af20809 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/TaskCloseCoordinator.cs @@ -15,12 +15,9 @@ // specific language governing permissions and limitations // under the License. -using System; using System.Text; using System.Threading; using Org.Apache.REEF.Common.Tasks.Events; -using Org.Apache.REEF.IMRU.OnREEF.Driver; -using Org.Apache.REEF.IMRU.OnREEF.Parameters; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; @@ -56,6 +53,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.IMRUTasks /// <param name="cancellationTokenSource"></param> internal void HandleEvent(ICloseEvent closeEvent, CancellationTokenSource cancellationTokenSource) { + Logger.Log(Level.Info, "HandleEvent: The task received close event"); cancellationTokenSource.Cancel(); _waitToCloseEvent.Wait();
