Repository: incubator-slider
Updated Branches:
  refs/heads/develop 7bad0eafb -> 90c9d3e22


SLIDER-438 Slider agent continues to run in the container on a node where NM 
dies


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/90c9d3e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/90c9d3e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/90c9d3e2

Branch: refs/heads/develop
Commit: 90c9d3e2282a88ba7e00c49975bb72b311e88273
Parents: 7bad0ea
Author: Gour Saha <[email protected]>
Authored: Wed Oct 15 01:29:10 2014 -0700
Committer: Gour Saha <[email protected]>
Committed: Wed Oct 15 01:29:10 2014 -0700

----------------------------------------------------------------------
 .../src/main/python/agent/ActionQueue.py        |   6 +
 .../src/main/python/agent/Controller.py         | 116 +++++++++++++++----
 .../providers/agent/AgentProviderService.java   |  36 ++++++
 .../apache/slider/providers/agent/Command.java  |   6 +-
 .../web/rest/agent/HeartBeatResponse.java       |  12 ++
 5 files changed, 155 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/90c9d3e2/slider-agent/src/main/python/agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/ActionQueue.py 
b/slider-agent/src/main/python/agent/ActionQueue.py
index 4c45a76..c0dbb91 100644
--- a/slider-agent/src/main/python/agent/ActionQueue.py
+++ b/slider-agent/src/main/python/agent/ActionQueue.py
@@ -154,6 +154,12 @@ class ActionQueue(threading.Thread):
                                                                 'tmperr'],
                                                               True,
                                                               store_config or 
store_command)
+    # If command is STOP then set flag to indicate stop has been triggered.
+    # In future we might check status of STOP command and take other measures
+    # if graceful STOP fails (like force kill the processes)
+    if command['roleCommand'] == 'STOP':
+      self.controller.appGracefulStopTriggered = True
+
     # dumping results
     status = self.COMPLETED_STATUS
     if commandresult[Constants.EXIT_CODE] != 0:

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/90c9d3e2/slider-agent/src/main/python/agent/Controller.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Controller.py 
b/slider-agent/src/main/python/agent/Controller.py
index 77f932c..39320fa 100644
--- a/slider-agent/src/main/python/agent/Controller.py
+++ b/slider-agent/src/main/python/agent/Controller.py
@@ -89,6 +89,10 @@ class Controller(threading.Thread):
     self.heartBeatRetryCount = 0
     self.autoRestartFailures = 0
     self.autoRestartTrackingSince = 0
+    self.terminateAgent = False
+    self.stopCommand = None
+    self.appGracefulStopQueued = False
+    self.appGracefulStopTriggered = False
 
 
   def __del__(self):
@@ -190,15 +194,48 @@ class Controller(threading.Thread):
 
   def shouldStopAgent(self):
     '''
-    If component has failed after start then stop the agent
+    Stop the agent if:
+      - Component has failed after start
+      - AM sent terminate agent command
     '''
+    shouldStopAgent = False
     if (self.componentActualState == State.FAILED) \
       and (self.componentExpectedState == State.STARTED) \
       and (self.failureCount >= Controller.MAX_FAILURE_COUNT_TO_STOP):
-      return True
-    else:
+      logger.info("Component instance has stopped, stopping the agent ...")
+      shouldStopAgent = True
+    if self.terminateAgent:
+      logger.info("Terminate agent command received from AM, stopping the 
agent ...")
+      shouldStopAgent = True
+    return shouldStopAgent
+
+  def isAppGracefullyStopped(self):
+    '''
+    If an app graceful stop command was queued then it is considered stopped 
if:
+      - app stop was triggered
+
+    Note: We should enhance this method by checking if the app is stopped
+          successfully and if not, then take alternate measures (like kill
+          processes). For now if stop is triggered it is considered stopped.
+    '''
+    if not self.appGracefulStopQueued:
       return False
-    pass
+    isAppStopped = False
+    if self.appGracefulStopTriggered:
+      isAppStopped = True
+    return isAppStopped
+
+  def stopApp(self):
+    '''
+    Stop the app if:
+      - the app is currently in STARTED state and
+        a valid stop command is provided
+    '''
+    if (self.componentActualState == State.STARTED) and (not self.stopCommand 
== None):
+      # Try to do graceful stop
+      self.addToQueue([self.stopCommand])
+      self.appGracefulStopQueued = True
+      logger.info("Attempting to gracefully stop the application ...")
 
   def heartbeatWithServer(self):
     self.DEBUG_HEARTBEAT_RETRIES = 0
@@ -210,12 +247,14 @@ class Controller(threading.Thread):
 
     while not self.DEBUG_STOP_HEARTBEATING:
 
-      if self.shouldStopAgent():
-        logger.info("Component instance has stopped, stopping the agent ...")
-        ProcessHelper.stopAgent()
-
       commandResult = {}
       try:
+        if self.appGracefulStopQueued and not self.isAppGracefullyStopped():
+          # Continue to wait until app is stopped
+          continue
+        if self.shouldStopAgent():
+          ProcessHelper.stopAgent()
+
         if not retry:
           data = json.dumps(
             self.heartbeat.build(commandResult,
@@ -232,6 +271,20 @@ class Controller(threading.Thread):
 
         serverId = int(response['responseId'])
 
+        if 'restartAgent' in response.keys():
+          restartAgent = response['restartAgent']
+          if restartAgent:
+            logger.error("Got restartAgent command")
+            self.restartAgent()
+        if 'terminateAgent' in response.keys():
+          terminateAgent = response['terminateAgent']
+          if terminateAgent:
+            logger.error("Got terminateAgent command")
+            self.terminateAgent = True
+            self.stopApp()
+            # Continue will add some wait time
+            continue
+
         restartEnabled = False
         if 'restartEnabled' in response:
           restartEnabled = response['restartEnabled']
@@ -257,17 +310,18 @@ class Controller(threading.Thread):
         else:
           self.responseId = serverId
 
+        commandSentFromAM = False
         if 'executionCommands' in response.keys():
           self.updateStateBasedOnCommand(response['executionCommands'])
           self.addToQueue(response['executionCommands'])
+          commandSentFromAM = True
           pass
         if 'statusCommands' in response.keys() and 
len(response['statusCommands']) > 0:
           self.addToQueue(response['statusCommands'])
+          commandSentFromAM = True
           pass
-        if "true" == response['restartAgent']:
-          logger.error("Got restartAgent command")
-          self.restartAgent()
-        else:
+
+        if not commandSentFromAM:
           logger.info("No commands sent from the Server.")
           pass
 
@@ -344,13 +398,14 @@ class Controller(threading.Thread):
           return
         self.cachedconnect = None # Previous connection is broken now
         retry = True
-      # Sleep for some time
-      timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
-                - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
-      self.heartbeat_wait_event.wait(timeout=timeout)
-      # Sleep a bit more to allow STATUS_COMMAND results to be collected
-      # and sent in one heartbeat. Also avoid server overload with heartbeats
-      time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
+      finally:
+        # Sleep for some time
+        timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
+                  - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
+        self.heartbeat_wait_event.wait(timeout=timeout)
+        # Sleep a bit more to allow STATUS_COMMAND results to be collected
+        # and sent in one heartbeat. Also avoid server overload with heartbeats
+        time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
     pass
     logger.info("Controller stopped heart-beating.")
 
@@ -366,6 +421,17 @@ class Controller(threading.Thread):
 
 
   def updateStateBasedOnCommand(self, commands, createStatus=True):
+    # A STOP command is paired with the START command to provide agents the
+    # capability to gracefully stop the app if possible. The STOP command needs
+    # to be stored since the AM might not be able to provide it since it could
+    # have lost the container state for whatever reasons. The STOP command has
+    # no other role to play in the Agent state transition so it is removed from
+    # the commands list.
+    index = 0
+    deleteIndex = 0
+    delete = False
+    # break only if an INSTALL command is found, since we might get a STOP
+    # command for a START command
     for command in commands:
       if command["roleCommand"] == "START":
         self.componentExpectedState = State.STARTED
@@ -374,12 +440,22 @@ class Controller(threading.Thread):
         if createStatus:
           self.statusCommand = self.createStatusCommand(command)
 
+      # The STOP command index is stored to be deleted
+      if command["roleCommand"] == "STOP":
+        self.stopCommand = command
+        delete = True
+        deleteIndex = index
+
       if command["roleCommand"] == "INSTALL":
         self.componentExpectedState = State.INSTALLED
         self.componentActualState = State.INSTALLING
         self.failureCount = 0
-      break;
+        break;
+      index += 1
 
+    # Delete the STOP command
+    if delete:
+      del commands[deleteIndex]
 
   def updateStateBasedOnResult(self, commandResult):
     if len(commandResult) > 0:

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/90c9d3e2/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 44777c3..19b5ddd 100644
--- 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -536,6 +536,11 @@ public class AgentProviderService extends 
AbstractProviderService implements
     }
 
     if (!getComponentStatuses().containsKey(label)) {
+      // container is completed but still heart-beating, send terminate signal
+      log.info(
+          "Sending terminate signal to completed container (still 
heartbeating): {}",
+          label);
+      response.setTerminateAgent(true);
       return response;
     }
 
@@ -1384,6 +1389,37 @@ public class AgentProviderService extends 
AbstractProviderService implements
 
     cmd.setConfigurations(configurations);
     response.addExecutionCommand(cmd);
+    
+    // With start command, the corresponding command for graceful stop needs to
+    // be sent. This will be used when a particular container is lost as per 
RM,
+    // but then the agent is still running and heart-beating to the Slider AM.
+    ExecutionCommand cmdStop = new ExecutionCommand(
+        AgentCommandType.EXECUTION_COMMAND);
+    cmdStop.setTaskId(taskId.get());
+    cmdStop.setCommandId(cmdStop.getTaskId() + "-1");
+    cmdStop.setHostname(hostName);
+    cmdStop.setClusterName(clusterName);
+    cmdStop.setRoleCommand(Command.STOP.toString());
+    cmdStop.setServiceName(clusterName);
+    cmdStop.setComponentName(roleName);
+    cmdStop.setRole(roleName);
+    Map<String, String> hostLevelParamsStop = new TreeMap<String, String>();
+    hostLevelParamsStop.put(JAVA_HOME, appConf.getGlobalOptions()
+        .getMandatoryOption(JAVA_HOME));
+    hostLevelParamsStop.put(CONTAINER_ID, containerId);
+    cmdStop.setHostLevelParams(hostLevelParamsStop);
+
+    Map<String, String> roleParamsStop = new TreeMap<String, String>();
+    cmdStop.setRoleParams(roleParamsStop);
+    cmdStop.getRoleParams().put("auto_restart",
+        Boolean.toString(isMarkedAutoRestart));
+
+    cmdStop.setCommandParams(setCommandParameters(scriptPath, timeout, true));
+
+    Map<String, Map<String, String>> configurationsStop = 
buildCommandConfigurations(
+        appConf, containerId);
+    cmdStop.setConfigurations(configurationsStop);
+    response.addExecutionCommand(cmdStop);
   }
 
   protected Map<String, String> getAllocatedPorts() {

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/90c9d3e2/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java 
b/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java
index cbeb69d..a851803 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java
@@ -22,7 +22,8 @@ package org.apache.slider.providers.agent;
 public enum Command {
   NOP,      // do nothing
   INSTALL,  // Install the component
-  START;     // Start the component
+  START,    // Start the component
+  STOP;     // Stop the component
 
   public static Command getCommand(String commandVal) {
     if (commandVal.equals(Command.START.toString())) {
@@ -31,6 +32,9 @@ public enum Command {
     if (commandVal.equals(Command.INSTALL.toString())) {
       return Command.INSTALL;
     }
+    if (commandVal.equals(Command.STOP.toString())) {
+      return Command.STOP;
+    }
 
     return Command.NOP;
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/90c9d3e2/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java
index 0545499..c118840 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java
@@ -42,6 +42,7 @@ public class HeartBeatResponse {
   boolean restartAgent = false;
   boolean restartEnabled = true;
   boolean hasMappedComponents = false;
+  boolean terminateAgent = false;
 
   @JsonProperty("responseId")
   public long getResponseId() {
@@ -113,6 +114,16 @@ public class HeartBeatResponse {
     this.hasMappedComponents = hasMappedComponents;
   }
 
+  @JsonProperty("terminateAgent")
+  public boolean isTerminateAgent() {
+    return terminateAgent;
+  }
+
+  @JsonProperty("terminateAgent")
+  public void setTerminateAgent(boolean terminateAgent) {
+    this.terminateAgent = terminateAgent;
+  }
+
   public void addExecutionCommand(ExecutionCommand execCmd) {
     executionCommands.add(execCmd);
   }
@@ -129,6 +140,7 @@ public class HeartBeatResponse {
            ", statusCommands=" + statusCommands +
            ", registrationCommand=" + registrationCommand +
            ", restartAgent=" + restartAgent +
+           ", terminateAgent=" + terminateAgent +
            '}';
   }
 }

Reply via email to