Repository: reef Updated Branches: refs/heads/master 80d6a1f6a -> f92218572
[REEF-1380] Avoid a NullReferenceException in HeartbeatManager This addressed the issue by * Checking for DONE states before sending heartbeat. * Only accessing _driverConnection when operation state is RECOVERY. * Using InjectionFuture to instantiate _driverConnection. JIRA: [REEF-1380](https://issues.apache.org/jira/browse/REEF-1380) Pull Reques: This closes #989 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/f9221857 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/f9221857 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/f9221857 Branch: refs/heads/master Commit: f922185724dcde9c9a40e80b45687e32097ffc5f Parents: 80d6a1f Author: Andrew Chung <[email protected]> Authored: Wed May 4 16:17:36 2016 -0700 Committer: Markus Weimer <[email protected]> Committed: Thu May 5 10:32:30 2016 -0700 ---------------------------------------------------------------------- .../Runtime/Evaluator/EvaluatorSettings.cs | 10 +--- .../Runtime/Evaluator/HeartBeatManager.cs | 62 +++++++++++--------- 2 files changed, 37 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/f9221857/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs index d561cd3..62d5372 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs @@ -50,7 +50,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator /// <param name="clock"></param> /// <param name="remoteManagerFactory"></param> /// <param name="reefMessageCodec"></param> - /// <param name="injector"></param> [Inject] private EvaluatorSettings( [Parameter(typeof(ApplicationIdentifier))] string applicationId, @@ -59,10 +58,9 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator [Parameter(typeof(HeartbeatMaxRetry))] int maxHeartbeatRetries, RuntimeClock clock, IRemoteManagerFactory remoteManagerFactory, - REEFMessageCodec reefMessageCodec, - IInjector injector) : + REEFMessageCodec reefMessageCodec) : this(applicationId, evaluatorId, heartbeatPeriodInMs, maxHeartbeatRetries, - clock, remoteManagerFactory, reefMessageCodec, injector, null) + clock, remoteManagerFactory, reefMessageCodec, null) { } @@ -75,7 +73,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator RuntimeClock clock, IRemoteManagerFactory remoteManagerFactory, REEFMessageCodec reefMessageCodec, - IInjector injector, INameClient nameClient) { _applicationId = applicationId; @@ -85,7 +82,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator _clock = clock; _remoteManager = remoteManagerFactory.GetInstance(reefMessageCodec); - EvaluatorInjector = injector; OperationState = EvaluatorOperationState.OPERATIONAL; NameClient = nameClient; } @@ -162,7 +158,5 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator { get { return _remoteManager; } } - - public IInjector EvaluatorInjector { get; private set; } } } http://git-wip-us.apache.org/repos/asf/reef/blob/f9221857/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs index db422b1..2c8064e 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs @@ -27,7 +27,9 @@ using Org.Apache.REEF.Common.Runtime.Evaluator.Context; using Org.Apache.REEF.Common.Runtime.Evaluator.Utils; using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Implementations.InjectionPlan; +using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; @@ -59,7 +61,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator private int _heartbeatFailures = 0; - private IDriverConnection _driverConnection; + private readonly IInjectionFuture<IDriverConnection> _driverConnection; private readonly EvaluatorSettings _evaluatorSettings; @@ -77,7 +79,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator EvaluatorSettings settings, IInjectionFuture<EvaluatorRuntime> evaluatorRuntime, IInjectionFuture<ContextManager> contextManager, - [Parameter(typeof(ErrorHandlerRid))] string errorHandlerRid) + [Parameter(typeof(ErrorHandlerRid))] string errorHandlerRid, + IInjectionFuture<IDriverConnection> driverConnection) { using (LOGGER.LogFunction("HeartBeatManager::HeartBeatManager")) { @@ -90,6 +93,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator _clock = settings.RuntimeClock; _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs; _maxHeartbeatRetries = settings.MaxHeartbeatRetries; + _driverConnection = driverConnection; MachineStatus.ToString(); // kick start the CPU perf counter } } @@ -154,14 +158,6 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator LOGGER.Log(Level.Info, "=========== Entering RECOVERY mode. ==========="); ContextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Disconnected)); - try - { - _driverConnection = _evaluatorSettings.EvaluatorInjector.GetInstance<IDriverConnection>(); - } - catch (Exception ex) - { - Utilities.Diagnostics.Exceptions.CaughtAndThrow(ex, Level.Error, "Failed to inject the driver reconnect implementation", LOGGER); - } LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection); _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY; @@ -254,38 +250,50 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator lock (this) { LOGGER.Log(Level.Verbose, "HeartbeatManager::OnNext(Alarm)"); + if (_evaluatorSettings.OperationState == EvaluatorOperationState.OPERATIONAL && EvaluatorRuntime.State == State.RUNNING) { EvaluatorHeartbeatProto evaluatorHeartbeatProto = GetEvaluatorHeartbeatProto(); LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Triggered a heartbeat: {0}. {1}Node Health: {2}", evaluatorHeartbeatProto, Environment.NewLine, MachineStatus.ToString())); Send(evaluatorHeartbeatProto); - _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this); } else { - LOGGER.Log(Level.Verbose, string.Format(CultureInfo.InvariantCulture, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", EvaluatorSettings.OperationState, EvaluatorRuntime.State)); - try + LOGGER.Log(Level.Verbose, "Ignoring regular heartbeat since Evaluator operation state is [{0}] and runtime state is [{1}]. ", EvaluatorSettings.OperationState, EvaluatorRuntime.State); + + if (EvaluatorRuntime.State == State.DONE || EvaluatorRuntime.State == State.FAILED || EvaluatorRuntime.State == State.KILLED) { - DriverInformation driverInformation = _driverConnection.GetDriverInformation(); - if (driverInformation == null) + return; + } + + if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY) + { + var driverConnection = _driverConnection.Get(); + + try { - LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later."); + var driverInformation = driverConnection.GetDriverInformation(); + if (driverInformation == null) + { + LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later."); + } + else + { + LOGGER.Log( + Level.Info, + string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId)); + Recover(driverInformation); + } } - else + catch (Exception e) { - LOGGER.Log( - Level.Info, - string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId)); - Recover(driverInformation); + // we do not want any exception to stop the query for driver status + Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, LOGGER); } } - catch (Exception e) - { - // we do not want any exception to stop the query for driver status - Org.Apache.REEF.Utilities.Diagnostics.Exceptions.Caught(e, Level.Warning, LOGGER); - } - _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this); } + + _clock.ScheduleAlarm(_heartBeatPeriodInMillSeconds, this); } }
