Repository: incubator-slider Updated Branches: refs/heads/develop 7bad0eafb -> 90c9d3e22
SLIDER-438 Slider agent continues to run in the container on a node where NM dies Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/90c9d3e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/90c9d3e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/90c9d3e2 Branch: refs/heads/develop Commit: 90c9d3e2282a88ba7e00c49975bb72b311e88273 Parents: 7bad0ea Author: Gour Saha <[email protected]> Authored: Wed Oct 15 01:29:10 2014 -0700 Committer: Gour Saha <[email protected]> Committed: Wed Oct 15 01:29:10 2014 -0700 ---------------------------------------------------------------------- .../src/main/python/agent/ActionQueue.py | 6 + .../src/main/python/agent/Controller.py | 116 +++++++++++++++---- .../providers/agent/AgentProviderService.java | 36 ++++++ .../apache/slider/providers/agent/Command.java | 6 +- .../web/rest/agent/HeartBeatResponse.java | 12 ++ 5 files changed, 155 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/90c9d3e2/slider-agent/src/main/python/agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/ActionQueue.py b/slider-agent/src/main/python/agent/ActionQueue.py index 4c45a76..c0dbb91 100644 --- a/slider-agent/src/main/python/agent/ActionQueue.py +++ b/slider-agent/src/main/python/agent/ActionQueue.py @@ -154,6 +154,12 @@ class ActionQueue(threading.Thread): 'tmperr'], True, store_config or store_command) + # If command is STOP then set flag to indicate stop has been triggered. + # In future we might check status of STOP command and take other measures + # if graceful STOP fails (like force kill the processes) + if command['roleCommand'] == 'STOP': + self.controller.appGracefulStopTriggered = True + # dumping results status = self.COMPLETED_STATUS if commandresult[Constants.EXIT_CODE] != 0: http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/90c9d3e2/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index 77f932c..39320fa 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -89,6 +89,10 @@ class Controller(threading.Thread): self.heartBeatRetryCount = 0 self.autoRestartFailures = 0 self.autoRestartTrackingSince = 0 + self.terminateAgent = False + self.stopCommand = None + self.appGracefulStopQueued = False + self.appGracefulStopTriggered = False def __del__(self): @@ -190,15 +194,48 @@ class Controller(threading.Thread): def shouldStopAgent(self): ''' - If component has failed after start then stop the agent + Stop the agent if: + - Component has failed after start + - AM sent terminate agent command ''' + shouldStopAgent = False if (self.componentActualState == State.FAILED) \ and (self.componentExpectedState == State.STARTED) \ and (self.failureCount >= Controller.MAX_FAILURE_COUNT_TO_STOP): - return True - else: + logger.info("Component instance has stopped, stopping the agent ...") + shouldStopAgent = True + if self.terminateAgent: + logger.info("Terminate agent command received from AM, stopping the agent ...") + shouldStopAgent = True + return shouldStopAgent + + def isAppGracefullyStopped(self): + ''' + If an app graceful stop command was queued then it is considered stopped if: + - app stop was triggered + + Note: We should enhance this method by checking if the app is stopped + successfully and if not, then take alternate measures (like kill + processes). For now if stop is triggered it is considered stopped. + ''' + if not self.appGracefulStopQueued: return False - pass + isAppStopped = False + if self.appGracefulStopTriggered: + isAppStopped = True + return isAppStopped + + def stopApp(self): + ''' + Stop the app if: + - the app is currently in STARTED state and + a valid stop command is provided + ''' + if (self.componentActualState == State.STARTED) and (not self.stopCommand == None): + # Try to do graceful stop + self.addToQueue([self.stopCommand]) + self.appGracefulStopQueued = True + logger.info("Attempting to gracefully stop the application ...") def heartbeatWithServer(self): self.DEBUG_HEARTBEAT_RETRIES = 0 @@ -210,12 +247,14 @@ class Controller(threading.Thread): while not self.DEBUG_STOP_HEARTBEATING: - if self.shouldStopAgent(): - logger.info("Component instance has stopped, stopping the agent ...") - ProcessHelper.stopAgent() - commandResult = {} try: + if self.appGracefulStopQueued and not self.isAppGracefullyStopped(): + # Continue to wait until app is stopped + continue + if self.shouldStopAgent(): + ProcessHelper.stopAgent() + if not retry: data = json.dumps( self.heartbeat.build(commandResult, @@ -232,6 +271,20 @@ class Controller(threading.Thread): serverId = int(response['responseId']) + if 'restartAgent' in response.keys(): + restartAgent = response['restartAgent'] + if restartAgent: + logger.error("Got restartAgent command") + self.restartAgent() + if 'terminateAgent' in response.keys(): + terminateAgent = response['terminateAgent'] + if terminateAgent: + logger.error("Got terminateAgent command") + self.terminateAgent = True + self.stopApp() + # Continue will add some wait time + continue + restartEnabled = False if 'restartEnabled' in response: restartEnabled = response['restartEnabled'] @@ -257,17 +310,18 @@ class Controller(threading.Thread): else: self.responseId = serverId + commandSentFromAM = False if 'executionCommands' in response.keys(): self.updateStateBasedOnCommand(response['executionCommands']) self.addToQueue(response['executionCommands']) + commandSentFromAM = True pass if 'statusCommands' in response.keys() and len(response['statusCommands']) > 0: self.addToQueue(response['statusCommands']) + commandSentFromAM = True pass - if "true" == response['restartAgent']: - logger.error("Got restartAgent command") - self.restartAgent() - else: + + if not commandSentFromAM: logger.info("No commands sent from the Server.") pass @@ -344,13 +398,14 @@ class Controller(threading.Thread): return self.cachedconnect = None # Previous connection is broken now retry = True - # Sleep for some time - timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \ - - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS - self.heartbeat_wait_event.wait(timeout=timeout) - # Sleep a bit more to allow STATUS_COMMAND results to be collected - # and sent in one heartbeat. Also avoid server overload with heartbeats - time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS) + finally: + # Sleep for some time + timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \ + - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS + self.heartbeat_wait_event.wait(timeout=timeout) + # Sleep a bit more to allow STATUS_COMMAND results to be collected + # and sent in one heartbeat. Also avoid server overload with heartbeats + time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS) pass logger.info("Controller stopped heart-beating.") @@ -366,6 +421,17 @@ class Controller(threading.Thread): def updateStateBasedOnCommand(self, commands, createStatus=True): + # A STOP command is paired with the START command to provide agents the + # capability to gracefully stop the app if possible. The STOP command needs + # to be stored since the AM might not be able to provide it since it could + # have lost the container state for whatever reasons. The STOP command has + # no other role to play in the Agent state transition so it is removed from + # the commands list. + index = 0 + deleteIndex = 0 + delete = False + # break only if an INSTALL command is found, since we might get a STOP + # command for a START command for command in commands: if command["roleCommand"] == "START": self.componentExpectedState = State.STARTED @@ -374,12 +440,22 @@ class Controller(threading.Thread): if createStatus: self.statusCommand = self.createStatusCommand(command) + # The STOP command index is stored to be deleted + if command["roleCommand"] == "STOP": + self.stopCommand = command + delete = True + deleteIndex = index + if command["roleCommand"] == "INSTALL": self.componentExpectedState = State.INSTALLED self.componentActualState = State.INSTALLING self.failureCount = 0 - break; + break; + index += 1 + # Delete the STOP command + if delete: + del commands[deleteIndex] def updateStateBasedOnResult(self, commandResult): if len(commandResult) > 0: http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/90c9d3e2/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index 44777c3..19b5ddd 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -536,6 +536,11 @@ public class AgentProviderService extends AbstractProviderService implements } if (!getComponentStatuses().containsKey(label)) { + // container is completed but still heart-beating, send terminate signal + log.info( + "Sending terminate signal to completed container (still heartbeating): {}", + label); + response.setTerminateAgent(true); return response; } @@ -1384,6 +1389,37 @@ public class AgentProviderService extends AbstractProviderService implements cmd.setConfigurations(configurations); response.addExecutionCommand(cmd); + + // With start command, the corresponding command for graceful stop needs to + // be sent. This will be used when a particular container is lost as per RM, + // but then the agent is still running and heart-beating to the Slider AM. + ExecutionCommand cmdStop = new ExecutionCommand( + AgentCommandType.EXECUTION_COMMAND); + cmdStop.setTaskId(taskId.get()); + cmdStop.setCommandId(cmdStop.getTaskId() + "-1"); + cmdStop.setHostname(hostName); + cmdStop.setClusterName(clusterName); + cmdStop.setRoleCommand(Command.STOP.toString()); + cmdStop.setServiceName(clusterName); + cmdStop.setComponentName(roleName); + cmdStop.setRole(roleName); + Map<String, String> hostLevelParamsStop = new TreeMap<String, String>(); + hostLevelParamsStop.put(JAVA_HOME, appConf.getGlobalOptions() + .getMandatoryOption(JAVA_HOME)); + hostLevelParamsStop.put(CONTAINER_ID, containerId); + cmdStop.setHostLevelParams(hostLevelParamsStop); + + Map<String, String> roleParamsStop = new TreeMap<String, String>(); + cmdStop.setRoleParams(roleParamsStop); + cmdStop.getRoleParams().put("auto_restart", + Boolean.toString(isMarkedAutoRestart)); + + cmdStop.setCommandParams(setCommandParameters(scriptPath, timeout, true)); + + Map<String, Map<String, String>> configurationsStop = buildCommandConfigurations( + appConf, containerId); + cmdStop.setConfigurations(configurationsStop); + response.addExecutionCommand(cmdStop); } protected Map<String, String> getAllocatedPorts() { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/90c9d3e2/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java b/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java index cbeb69d..a851803 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java @@ -22,7 +22,8 @@ package org.apache.slider.providers.agent; public enum Command { NOP, // do nothing INSTALL, // Install the component - START; // Start the component + START, // Start the component + STOP; // Stop the component public static Command getCommand(String commandVal) { if (commandVal.equals(Command.START.toString())) { @@ -31,6 +32,9 @@ public enum Command { if (commandVal.equals(Command.INSTALL.toString())) { return Command.INSTALL; } + if (commandVal.equals(Command.STOP.toString())) { + return Command.STOP; + } return Command.NOP; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/90c9d3e2/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java index 0545499..c118840 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/web/rest/agent/HeartBeatResponse.java @@ -42,6 +42,7 @@ public class HeartBeatResponse { boolean restartAgent = false; boolean restartEnabled = true; boolean hasMappedComponents = false; + boolean terminateAgent = false; @JsonProperty("responseId") public long getResponseId() { @@ -113,6 +114,16 @@ public class HeartBeatResponse { this.hasMappedComponents = hasMappedComponents; } + @JsonProperty("terminateAgent") + public boolean isTerminateAgent() { + return terminateAgent; + } + + @JsonProperty("terminateAgent") + public void setTerminateAgent(boolean terminateAgent) { + this.terminateAgent = terminateAgent; + } + public void addExecutionCommand(ExecutionCommand execCmd) { executionCommands.add(execCmd); } @@ -129,6 +140,7 @@ public class HeartBeatResponse { ", statusCommands=" + statusCommands + ", registrationCommand=" + registrationCommand + ", restartAgent=" + restartAgent + + ", terminateAgent=" + terminateAgent + '}'; } }
