Repository: reef Updated Branches: refs/heads/master c0ddcc9c9 -> dd14f0910
[REEF-1679] Evaluator shouldn't go to recovery mode if there is no reconnect logic provided If there is no IDriverConnection bound, instead of throwing exception after a few quick retries, we will increase the retry number and finally exit the evaluator if it reaches max retry limit. If there is a IDriverConnection implemented, we will increase the max retry number since network glitch could happen for more than a few seconds. JIRA: [REEF-1679](https://issues.apache.org/jira/browse/REEF-1679) Pull request: This closes #1193 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/dd14f091 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/dd14f091 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/dd14f091 Branch: refs/heads/master Commit: dd14f091050b9e8f22cff3627a8be686fd91350e Parents: c0ddcc9 Author: Julia Wang <[email protected]> Authored: Tue Nov 29 18:34:38 2016 -0800 Committer: Mariia Mykhailova <[email protected]> Committed: Fri Dec 2 13:59:23 2016 -0800 ---------------------------------------------------------------------- .../Org.Apache.REEF.Common.csproj | 1 + .../Runtime/Evaluator/EvaluatorSettings.cs | 18 ++++- .../Runtime/Evaluator/HeartBeatManager.cs | 70 ++++++++++++++------ .../Evaluator/Parameters/HeartbeatMaxRetry.cs | 2 +- .../HeartbeatMaxRetryForNonRecoveryMode.cs | 26 ++++++++ 5 files changed, 94 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj index 8399e43..936bc01 100644 --- a/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj +++ b/lang/cs/Org.Apache.REEF.Common/Org.Apache.REEF.Common.csproj @@ -199,6 +199,7 @@ under the License. <Compile Include="Runtime\Evaluator\HeartBeatManager.cs" /> <Compile Include="Runtime\Evaluator\IHeartBeatManager.cs" /> <Compile Include="Runtime\Evaluator\Parameters\EvaluatorHeartbeatPeriodInMs.cs" /> + <Compile Include="Runtime\Evaluator\Parameters\HeartbeatMaxRetryForNonRecoveryMode.cs" /> <Compile Include="Runtime\Evaluator\Parameters\HeartbeatMaxRetry.cs" /> <Compile Include="Runtime\Evaluator\PIDStoreHandler.cs" /> <Compile Include="Runtime\Evaluator\ReefMessageProtoObserver.cs" /> http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs index 126ce02..cc58a5a 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/EvaluatorSettings.cs @@ -35,6 +35,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator private readonly string _evaluatorId; private readonly int _heartBeatPeriodInMs; private readonly int _maxHeartbeatRetries; + private readonly int _maxHeartbeatRetriesForNonrecoveryMode; private readonly IClock _clock; private readonly IRemoteManager<REEFMessage> _remoteManager; @@ -45,6 +46,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator /// <param name="evaluatorId"></param> /// <param name="heartbeatPeriodInMs"></param> /// <param name="maxHeartbeatRetries"></param> + /// <param name="maxHeartbeatRetriesForNonRecoveryMode">Max retry number for non HA mode</param> /// <param name="clock"></param> /// <param name="remoteManagerFactory"></param> /// <param name="reefMessageCodec"></param> @@ -54,10 +56,11 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator [Parameter(typeof(EvaluatorIdentifier))] string evaluatorId, [Parameter(typeof(EvaluatorHeartbeatPeriodInMs))] int heartbeatPeriodInMs, [Parameter(typeof(HeartbeatMaxRetry))] int maxHeartbeatRetries, + [Parameter(typeof(HeartbeatMaxRetryForNonRecoveryMode))] int maxHeartbeatRetriesForNonRecoveryMode, IClock clock, IRemoteManagerFactory remoteManagerFactory, REEFMessageCodec reefMessageCodec) : - this(applicationId, evaluatorId, heartbeatPeriodInMs, maxHeartbeatRetries, + this(applicationId, evaluatorId, heartbeatPeriodInMs, maxHeartbeatRetries, maxHeartbeatRetriesForNonRecoveryMode, clock, remoteManagerFactory, reefMessageCodec, null) { } @@ -68,6 +71,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator [Parameter(typeof(EvaluatorIdentifier))] string evaluatorId, [Parameter(typeof(EvaluatorHeartbeatPeriodInMs))] int heartbeatPeriodInMs, [Parameter(typeof(HeartbeatMaxRetry))] int maxHeartbeatRetries, + [Parameter(typeof(HeartbeatMaxRetryForNonRecoveryMode))] int maxHeartbeatRetriesForNonRecoveryMode, IClock clock, IRemoteManagerFactory remoteManagerFactory, REEFMessageCodec reefMessageCodec, @@ -77,6 +81,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator _evaluatorId = evaluatorId; _heartBeatPeriodInMs = heartbeatPeriodInMs; _maxHeartbeatRetries = maxHeartbeatRetries; + _maxHeartbeatRetriesForNonrecoveryMode = maxHeartbeatRetriesForNonRecoveryMode; _clock = clock; _remoteManager = remoteManagerFactory.GetInstance(reefMessageCodec); @@ -134,6 +139,17 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator } /// <summary> + /// Return MaxHeartbeatRetriesForNonrecoveryMode from NamedParameter + /// </summary> + public int MaxHeartbeatRetriesForNonRecoveryMode + { + get + { + return _maxHeartbeatRetriesForNonrecoveryMode; + } + } + + /// <summary> /// return Runtime Clock injected from the constructor /// </summary> public IClock RuntimeClock http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs index 0f3fbd6..889c67c 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/HeartBeatManager.cs @@ -29,7 +29,6 @@ using Org.Apache.REEF.Common.Tasks; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Implementations.InjectionPlan; -using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Utilities; using Org.Apache.REEF.Utilities.Attributes; using Org.Apache.REEF.Utilities.Logging; @@ -55,6 +54,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator private readonly int _maxHeartbeatRetries = 0; + private readonly int _maxHeartbeatRetriesForNonRecoveryMode = 0; + private IRemoteIdentifier _remoteId; private IObserver<REEFMessage> _observer; @@ -95,6 +96,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator _clock = settings.RuntimeClock; _heartBeatPeriodInMillSeconds = settings.HeartBeatPeriodInMs; _maxHeartbeatRetries = settings.MaxHeartbeatRetries; + _maxHeartbeatRetriesForNonRecoveryMode = settings.MaxHeartbeatRetriesForNonRecoveryMode; _driverConnection = driverConnection; MachineStatus.ToString(); // kick start the CPU perf counter } @@ -152,7 +154,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator try { _observer.OnNext(payload); - _heartbeatFailures = 0; // reset failure counts if we are having intermidtten (not continuous) failures + _heartbeatFailures = 0; // reset failure counts if we are having intermittent (not continuous) failures } catch (Exception e) { @@ -166,17 +168,35 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator _queuedHeartbeats.Enqueue(evaluatorHeartbeatProto); LOGGER.Log(Level.Error, string.Format(CultureInfo.InvariantCulture, "Sending heartbeat to driver experienced #{0} failure. Hearbeat queued as: [{1}]. ", _heartbeatFailures, evaluatorHeartbeatProto), e); - if (_heartbeatFailures >= _maxHeartbeatRetries) + if (_driverConnection.Get() is MissingDriverConnection) { - LOGGER.Log(Level.Warning, "Heartbeat communications to driver reached max of {0} failures. Driver is considered dead/unreachable", _heartbeatFailures); - LOGGER.Log(Level.Info, "=========== Entering RECOVERY mode. ==========="); - ContextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Disconnected)); - - LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection); - _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY; - - // clean heartbeat failure - _heartbeatFailures = 0; + if (_heartbeatFailures >= _maxHeartbeatRetriesForNonRecoveryMode) + { + var msg = + string.Format(CultureInfo.InvariantCulture, + "Have encountered {0} heartbeat failures. Limit of heartbeat sending failures exceeded. Driver reconnect logic is not implemented, failing evaluator.", + _heartbeatFailures); + LOGGER.Log(Level.Error, msg); + throw new ReefRuntimeException(msg, e); + } + } + else + { + if (_heartbeatFailures >= _maxHeartbeatRetries) + { + LOGGER.Log(Level.Warning, + "Heartbeat communications to driver reached max of {0} failures. Driver is considered dead/unreachable", + _heartbeatFailures); + LOGGER.Log(Level.Info, "Entering RECOVERY mode!!!"); + ContextManager.HandleDriverConnectionMessage( + new DriverConnectionMessageImpl(DriverConnectionState.Disconnected)); + + LOGGER.Log(Level.Info, "instantiate driver reconnect implementation: " + _driverConnection); + _evaluatorSettings.OperationState = EvaluatorOperationState.RECOVERY; + + // clean heartbeat failure + _heartbeatFailures = 0; + } } } } @@ -284,22 +304,30 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator if (_evaluatorSettings.OperationState == EvaluatorOperationState.RECOVERY) { var driverConnection = _driverConnection.Get(); - try { var driverInformation = driverConnection.GetDriverInformation(); if (driverInformation == null) { - LOGGER.Log(Level.Verbose, "In RECOVERY mode, cannot retrieve driver information, will try again later."); + LOGGER.Log(Level.Verbose, + "In RECOVERY mode, cannot retrieve driver information, will try again later."); } else { - LOGGER.Log( - Level.Info, - string.Format(CultureInfo.InvariantCulture, "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", driverInformation.DriverStartTime, driverInformation.DriverRemoteIdentifier, driverInformation.NameServerId)); + var msg = string.Format(CultureInfo.InvariantCulture, + "Detect driver restarted at {0} and is running on endpoint {1} with services {2}. Now trying to re-establish connection", + driverInformation.DriverStartTime, + driverInformation.DriverRemoteIdentifier, + driverInformation.NameServerId); + LOGGER.Log(Level.Info, msg); Recover(driverInformation); } } + catch (NotImplementedException) + { + LOGGER.Log(Level.Error, "Reaching EvaluatorOperation RECOVERY mode, however, there is no IDriverConnection implemented for HA."); + throw; + } catch (Exception e) { // we do not want any exception to stop the query for driver status @@ -329,8 +357,8 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator private static long CurrentTimeMilliSeconds() { - // this is an implmenation to get current time milli second counted from Jan 1st, 1970 - // it is chose as such to be compatible with java implmentation + // this is an implementation to get current time in milliseconds counted from Jan 1st, 1970 + // it is chosen as such to be compatible with Java implementation DateTime jan1St1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); return (long)(DateTime.UtcNow - jan1St1970).TotalMilliseconds; } @@ -367,7 +395,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator { if (firstHeartbeatInQueue) { - // first heartbeat is specially construted to include the recovery flag + // first heartbeat is specially constructed to include the recovery flag EvaluatorHeartbeatProto recoveryHeartbeat = ConstructRecoveryHeartBeat(_queuedHeartbeats.Dequeue()); LOGGER.Log(Level.Info, "Recovery heartbeat to be sent:" + recoveryHeartbeat); _observer.OnNext(new REEFMessage(recoveryHeartbeat)); @@ -394,7 +422,7 @@ namespace Org.Apache.REEF.Common.Runtime.Evaluator _evaluatorSettings.OperationState = EvaluatorOperationState.OPERATIONAL; ContextManager.HandleDriverConnectionMessage(new DriverConnectionMessageImpl(DriverConnectionState.Reconnected)); - LOGGER.Log(Level.Info, "=========== Exiting RECOVERY mode. ==========="); + LOGGER.Log(Level.Info, "Exiting RECOVERY mode!!!"); } private EvaluatorHeartbeatProto ConstructRecoveryHeartBeat(EvaluatorHeartbeatProto heartbeat) http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs index 2211a3e..bd836db 100644 --- a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetry.cs @@ -19,7 +19,7 @@ using Org.Apache.REEF.Tang.Annotations; namespace Org.Apache.REEF.Common.Runtime.Evaluator.Parameters { - [NamedParameter(Documentation = "Heartbeat Max Retry", ShortName = "HeartbeatMaxRetry", DefaultValue = "3")] + [NamedParameter(Documentation = "Max number of retries for sending heartbeat to driver before evaluator enters recovery mode to reconnect with driver.", ShortName = "HeartbeatMaxRetry", DefaultValue = "10")] internal sealed class HeartbeatMaxRetry : Name<int> { } http://git-wip-us.apache.org/repos/asf/reef/blob/dd14f091/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetryForNonRecoveryMode.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetryForNonRecoveryMode.cs b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetryForNonRecoveryMode.cs new file mode 100644 index 0000000..b0bc06c --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Common/Runtime/Evaluator/Parameters/HeartbeatMaxRetryForNonRecoveryMode.cs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Common.Runtime.Evaluator.Parameters +{ + [NamedParameter(Documentation = "Max number of retries for sending heartbeat to driver if driver reconnection logic is not implemented.", ShortName = "HeartbeatMaxRetryForNonRecovery", DefaultValue = "60")] + internal sealed class HeartbeatMaxRetryForNonRecoveryMode : Name<int> + { + } +}
