Repository: incubator-slider
Updated Branches:
  refs/heads/develop df964c370 -> f0cd53ef1


SLIDER-1188 Make AM agent heartbeat loss configurable / increase default.


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/f0cd53ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/f0cd53ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/f0cd53ef

Branch: refs/heads/develop
Commit: f0cd53ef1dfd540e5dfc86cb46e92e66d8f0773b
Parents: df964c3
Author: Billie Rinaldi <bil...@apache.org>
Authored: Tue Jan 24 15:07:05 2017 -0800
Committer: Billie Rinaldi <bil...@apache.org>
Committed: Tue Jan 24 15:07:05 2017 -0800

----------------------------------------------------------------------
 .../slider/providers/agent/AgentKeys.java       |  2 ++
 .../providers/agent/AgentProviderService.java   | 29 ++++++++++++++++++--
 .../providers/agent/HeartbeatMonitor.java       | 11 +++++++-
 .../providers/agent/TestHeartbeatMonitor.java   |  6 ++--
 4 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/f0cd53ef/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
index 9ea984c..c7f8df2 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentKeys.java
@@ -97,6 +97,8 @@ public interface AgentKeys {
   String PYTHON_EXE = "python";
   String CREATE_DEF_ZK_NODE = "create.default.zookeeper.node";
   String HEARTBEAT_MONITOR_INTERVAL = "heartbeat.monitor.interval";
+  String HEARTBEAT_LOST_INTERVAL = "heartbeat.lost.interval";
+  int DEFAULT_HEARTBEAT_LOST_INTERVAL = 2 * 60 * 60 * 1000; // 2 hours
   String AGENT_INSTANCE_DEBUG_DATA = "agent.instance.debug.data";
   String AGENT_OUT_FILE = "slider-agent.out";
   String KEY_AGENT_TWO_WAY_SSL_ENABLED = "ssl.server.client.auth";

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/f0cd53ef/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 2ab5c6f..7f3b04e 100644
--- 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -172,6 +172,7 @@ public class AgentProviderService extends 
AbstractProviderService implements
   private final Object syncLock = new Object();
   private final ComponentTagProvider tags = new ComponentTagProvider();
   private int heartbeatMonitorInterval = 0;
+  private int heartbeatLostInterval = 0;
   private AgentClientProvider clientProvider;
   private AtomicInteger taskId = new AtomicInteger(0);
   private volatile Map<String, MetainfoHolder> metaInfoMap = new HashMap<>();
@@ -230,6 +231,7 @@ public class AgentProviderService extends 
AbstractProviderService implements
     super("AgentProviderService");
     setAgentRestOperations(this);
     setHeartbeatMonitorInterval(DEFAULT_HEARTBEAT_MONITOR_INTERVAL);
+    setHeartbeatLostInterval(DEFAULT_HEARTBEAT_LOST_INTERVAL);
   }
 
   @Override
@@ -345,7 +347,8 @@ public class AgentProviderService extends 
AbstractProviderService implements
           Map<String, DefaultConfig> defaultConfigs =
               initializeDefaultConfigs(fileSystem, appDef, metaInfo);
           metaInfoMap.put(mapKey, new MetainfoHolder(metaInfo, 
defaultConfigs));
-          monitor = new HeartbeatMonitor(this, getHeartbeatMonitorInterval());
+          monitor = new HeartbeatMonitor(this, getHeartbeatMonitorInterval(),
+              getHeartbeatLostInterval());
           monitor.start();
 
           // build a map from component to metainfo
@@ -1555,7 +1558,8 @@ public class AgentProviderService extends 
AbstractProviderService implements
   }
 
   /**
-   * Reads and sets the heartbeat monitoring interval. If bad value is 
provided then log it and set to default.
+   * Reads and sets the heartbeat monitoring interval and heartbeat lost
+   * interval. If bad value is provided then log it and set to default.
    *
    * @param instanceDefinition
    */
@@ -1572,6 +1576,18 @@ public class AgentProviderService extends 
AbstractProviderService implements
           HEARTBEAT_MONITOR_INTERVAL,
           DEFAULT_HEARTBEAT_MONITOR_INTERVAL);
     }
+    String hbLostInterval = instanceDefinition.getAppConfOperations().
+        getGlobalOptions().getOption(AgentKeys.HEARTBEAT_LOST_INTERVAL,
+        Integer.toString(DEFAULT_HEARTBEAT_LOST_INTERVAL));
+    try {
+      setHeartbeatLostInterval(Integer.parseInt(hbLostInterval));
+    } catch (NumberFormatException e) {
+      log.warn(
+          "Bad value {} for {}. Defaulting to ",
+          hbLostInterval,
+          HEARTBEAT_LOST_INTERVAL,
+          DEFAULT_HEARTBEAT_LOST_INTERVAL);
+    }
   }
 
   /**
@@ -1637,6 +1653,11 @@ public class AgentProviderService extends 
AbstractProviderService implements
     this.heartbeatMonitorInterval = heartbeatMonitorInterval;
   }
 
+  @VisibleForTesting
+  protected void setHeartbeatLostInterval(int heartbeatLostInterval) {
+    this.heartbeatLostInterval = heartbeatLostInterval;
+  }
+
   public void setInUpgradeMode(boolean inUpgradeMode) {
     this.isInUpgradeMode = inUpgradeMode;
   }
@@ -1692,6 +1713,10 @@ public class AgentProviderService extends 
AbstractProviderService implements
     return this.heartbeatMonitorInterval;
   }
 
+  private int getHeartbeatLostInterval() {
+    return this.heartbeatLostInterval;
+  }
+
   private String getClusterName() {
     if (SliderUtils.isUnset(clusterName)) {
       clusterName = 
getAmState().getInternalsSnapshot().get(OptionKeys.APPLICATION_NAME);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/f0cd53ef/slider-core/src/main/java/org/apache/slider/providers/agent/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/HeartbeatMonitor.java
 
b/slider-core/src/main/java/org/apache/slider/providers/agent/HeartbeatMonitor.java
index 4293916..80aea2d 100644
--- 
a/slider-core/src/main/java/org/apache/slider/providers/agent/HeartbeatMonitor.java
+++ 
b/slider-core/src/main/java/org/apache/slider/providers/agent/HeartbeatMonitor.java
@@ -25,18 +25,27 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static 
org.apache.slider.providers.agent.AgentKeys.DEFAULT_HEARTBEAT_LOST_INTERVAL;
+
 /** Monitors the container state and heartbeats. */
 public class HeartbeatMonitor implements Runnable {
   protected static final Logger log =
       LoggerFactory.getLogger(HeartbeatMonitor.class);
   private final int threadWakeupInterval; //1 minute
+  private final int heartbeatLostInterval; //2 hours
   private final AgentProviderService provider;
   private volatile boolean shouldRun = true;
   private Thread monitorThread = null;
 
   public HeartbeatMonitor(AgentProviderService provider, int 
threadWakeupInterval) {
+    this(provider, threadWakeupInterval, DEFAULT_HEARTBEAT_LOST_INTERVAL);
+  }
+
+  public HeartbeatMonitor(AgentProviderService provider,
+      int threadWakeupInterval, int heartbeatLostInterval) {
     this.provider = provider;
     this.threadWakeupInterval = threadWakeupInterval;
+    this.heartbeatLostInterval = heartbeatLostInterval;
   }
 
   public void shutdown() {
@@ -105,7 +114,7 @@ public class HeartbeatMonitor implements Runnable {
                   timeSinceLastHeartbeat);
               break;
             case UNHEALTHY:
-              if (timeSinceLastHeartbeat > threadWakeupInterval * 2) {
+              if (timeSinceLastHeartbeat > heartbeatLostInterval) {
                 componentInstanceState.setContainerState(
                     ContainerState.HEARTBEAT_LOST);
                 log.warn(

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/f0cd53ef/slider-core/src/test/java/org/apache/slider/providers/agent/TestHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/test/java/org/apache/slider/providers/agent/TestHeartbeatMonitor.java
 
b/slider-core/src/test/java/org/apache/slider/providers/agent/TestHeartbeatMonitor.java
index 7314b72..5b9b538 100644
--- 
a/slider-core/src/test/java/org/apache/slider/providers/agent/TestHeartbeatMonitor.java
+++ 
b/slider-core/src/test/java/org/apache/slider/providers/agent/TestHeartbeatMonitor.java
@@ -40,7 +40,7 @@ public class TestHeartbeatMonitor {
   @Test
   public void testRegularHeartbeat() throws Exception {
     AgentProviderService provider = createNiceMock(AgentProviderService.class);
-    HeartbeatMonitor hbm = new HeartbeatMonitor(provider, 1 * 1000);
+    HeartbeatMonitor hbm = new HeartbeatMonitor(provider, 1 * 1000, 2 * 1000);
     Assert.assertFalse(hbm.isAlive());
     expect(provider.getComponentStatuses()).andReturn(null).anyTimes();
     replay(provider);
@@ -54,7 +54,7 @@ public class TestHeartbeatMonitor {
   @Test
   public void testHeartbeatMonitorWithHealthy() throws Exception {
     AgentProviderService provider = createNiceMock(AgentProviderService.class);
-    HeartbeatMonitor hbm = new HeartbeatMonitor(provider, 500);
+    HeartbeatMonitor hbm = new HeartbeatMonitor(provider, 500, 2 * 500);
     Assert.assertFalse(hbm.isAlive());
     Map<String, ComponentInstanceState> statuses = new HashMap<String, 
ComponentInstanceState>();
     ContainerId container1 = new MockContainerId(1);
@@ -101,7 +101,7 @@ public class TestHeartbeatMonitor {
 
 
     HeartbeatMonitor heartbeatMonitor = new HeartbeatMonitor(provider,
-        wakeupInterval);
+        wakeupInterval, 2 * wakeupInterval);
     Assert.assertFalse(heartbeatMonitor.isAlive());
     now += wakeupInterval;
     masterState.setState(State.STARTED);

Reply via email to