Repository: incubator-slider Updated Branches: refs/heads/develop 4ab80d80a -> d850dd28c
SLIDER-128 Support graceful stop of component instances Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/d850dd28 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/d850dd28 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/d850dd28 Branch: refs/heads/develop Commit: d850dd28c3df4613c9eebaee0bee8a47abcee672 Parents: 4ab80d8 Author: Gour Saha <[email protected]> Authored: Thu Nov 12 08:32:13 2015 -0800 Committer: Gour Saha <[email protected]> Committed: Thu Nov 12 08:32:55 2015 -0800 ---------------------------------------------------------------------- .../src/main/python/agent/ActionQueue.py | 1 + .../src/main/python/agent/Controller.py | 14 ++++ slider-agent/src/main/python/agent/main.py | 4 +- .../org/apache/slider/common/SliderKeys.java | 6 ++ .../providers/agent/AgentProviderService.java | 13 +++ .../providers/agent/ComponentInstanceState.java | 20 +++-- .../apache/slider/providers/agent/State.java | 8 +- .../server/appmaster/SliderAppMaster.java | 40 ++++++++- .../agent/TestAgentProviderService.java | 87 ++++++++++++++++++++ .../slider/providers/agent/TestState.java | 33 ++++++++ 10 files changed, 215 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/d850dd28/slider-agent/src/main/python/agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/ActionQueue.py b/slider-agent/src/main/python/agent/ActionQueue.py index ca68d5d..497d4f4 100644 --- a/slider-agent/src/main/python/agent/ActionQueue.py +++ b/slider-agent/src/main/python/agent/ActionQueue.py @@ -185,6 +185,7 @@ class ActionQueue(threading.Thread): # In future we might check status of STOP command and take other measures # if graceful STOP fails (like force kill the processes) if command['roleCommand'] == 'STOP': + logger.info("Stop command received") self.controller.appGracefulStopTriggered = True # dumping results http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/d850dd28/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index e99b78c..ec3bed7 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -214,6 +214,9 @@ class Controller(threading.Thread): if (self.componentActualState == State.FAILED) \ and (self.componentExpectedState == State.STARTED) \ and (self.failureCount >= Controller.MAX_FAILURE_COUNT_TO_STOP): + logger.info("Component instance has failed, stopping the agent ...") + shouldStopAgent = True + if (self.componentActualState == State.STOPPED): logger.info("Component instance has stopped, stopping the agent ...") shouldStopAgent = True if self.terminateAgent: @@ -272,6 +275,8 @@ class Controller(threading.Thread): try: if self.appGracefulStopQueued and not self.isAppGracefullyStopped(): # Continue to wait until app is stopped + logger.info("Graceful stop in progress..") + time.sleep(1) continue if self.shouldStopAgent(): ProcessHelper.stopAgent() @@ -467,9 +472,18 @@ class Controller(threading.Thread): # The STOP command index is stored to be deleted if command["roleCommand"] == "STOP": + logger.info("Got stop command = %s", (command)) self.stopCommand = command + ''' + If app is already running then stopApp() will initiate graceful stop + ''' + self.stopApp() delete = True deleteIndex = index + if self.componentActualState == State.STARTED: + self.componentExpectedState = State.STOPPED + self.componentActualState = State.STOPPING + self.failureCount = 0 if command["roleCommand"] == "INSTALL": self.componentExpectedState = State.INSTALLED http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/d850dd28/slider-agent/src/main/python/agent/main.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/main.py b/slider-agent/src/main/python/agent/main.py index bfd4a27..68f46b7 100644 --- a/slider-agent/src/main/python/agent/main.py +++ b/slider-agent/src/main/python/agent/main.py @@ -56,7 +56,9 @@ def signal_handler(signum, frame): logger.info('signal received, exiting.') global controller if controller is not None and hasattr(controller, 'actionQueue'): - tmpdir = controller.actionQueue.dockerManager.stop_container() + docker_mode = controller.actionQueue.docker_mode + if docker_mode: + tmpdir = controller.actionQueue.dockerManager.stop_container() ProcessHelper.stopAgent() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/d850dd28/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java index 22798e3..1d2d5f8 100644 --- a/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java +++ b/slider-core/src/main/java/org/apache/slider/common/SliderKeys.java @@ -76,6 +76,12 @@ public interface SliderKeys extends SliderXmlConfKeys { String APP_VERSION_UNKNOWN = "awaiting heartbeat..."; /** + * Keys for application container specific properties, like release timeout + */ + String APP_CONTAINER_RELEASE_TIMEOUT = "site.global.app_container.release_timeout_secs"; + int APP_CONTAINER_HEARTBEAT_INTERVAL_SEC = 10; // look for HEARTBEAT_IDDLE_INTERVAL_SEC + + /** * JVM arg to force IPv4 {@value} */ String JVM_ENABLE_ASSERTIONS = "-ea"; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/d850dd28/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index e3dc791..7e3e87b 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -173,6 +173,7 @@ public class AgentProviderService extends AbstractProviderService implements private String clusterName = null; private boolean isInUpgradeMode; private Set<String> upgradeContainers = new HashSet<String>(); + private boolean appStopInitiated; private final Map<String, ComponentInstanceState> componentStatuses = new ConcurrentHashMap<String, ComponentInstanceState>(); @@ -879,6 +880,12 @@ public class AgentProviderService extends AbstractProviderService implements componentStatus.getState(), componentStatus.getTargetState()); } + if (appStopInitiated && !componentStatus.isStopInitiated()) { + log.info("Stop initiated for label {}", label); + componentStatus.setTargetState(State.STOPPED); + componentStatus.setStopInitiated(true); + } + publishConfigAndExportGroups(heartBeat, componentStatus, roleName); CommandResult result = null; List<CommandReport> reports = heartBeat.getReports(); @@ -1001,6 +1008,8 @@ public class AgentProviderService extends AbstractProviderService implements timeout); componentStatus.commandIssued(command, true); } else if (command == Command.STOP) { + log.info("Stop command being sent to container with id {}", + containerId); addStopCommand(roleName, containerId, response, scriptPath, timeout, doUpgrade); componentStatus.commandIssued(command); @@ -1276,6 +1285,10 @@ public class AgentProviderService extends AbstractProviderService implements this.upgradeContainers.addAll(upgradeContainers); } + public void setAppStopInitiated(boolean appStopInitiated) { + this.appStopInitiated = appStopInitiated; + } + /** * Read all default configs * http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/d850dd28/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java b/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java index c4a694e..55fdba6 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java @@ -51,6 +51,8 @@ public class ComponentInstanceState { private Map<String, State> pkgStatuses; private String nextPkgToInstall; + private boolean stopInitiated; + public ComponentInstanceState(String componentName, ContainerId containerId, String applicationId) { @@ -234,7 +236,7 @@ public class ComponentInstanceState { return Command.INSTALL_ADDON; } } - return this.state.getSupportedCommand(isInUpgradeMode); + return this.state.getSupportedCommand(isInUpgradeMode, stopInitiated); } public State getState() { @@ -254,6 +256,18 @@ public class ComponentInstanceState { this.targetState = targetState; } + public String getNextPkgToInstall() { + return nextPkgToInstall; + } + + public boolean isStopInitiated() { + return stopInitiated; + } + + public void setStopInitiated(boolean stopInitiated) { + this.stopInitiated = stopInitiated; + } + @Override public int hashCode() { int hashCode = 1; @@ -303,8 +317,4 @@ public class ComponentInstanceState { sb.append('}'); return sb.toString(); } - - public String getNextPkgToInstall() { - return nextPkgToInstall; - } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/d850dd28/slider-core/src/main/java/org/apache/slider/providers/agent/State.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/State.java b/slider-core/src/main/java/org/apache/slider/providers/agent/State.java index 11105fb..5603f8d 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/State.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/State.java @@ -61,6 +61,11 @@ public enum State { } public Command getSupportedCommand(boolean isInUpgradeMode) { + return getSupportedCommand(isInUpgradeMode, false); + } + + public Command getSupportedCommand(boolean isInUpgradeMode, + boolean stopInitiated) { switch (this) { case INIT: case INSTALL_FAILED: @@ -68,7 +73,8 @@ public enum State { case INSTALLED: return Command.START; case STARTED: - return isInUpgradeMode ? Command.UPGRADE : Command.NOP; + return isInUpgradeMode ? Command.UPGRADE : (stopInitiated) ? Command.STOP + : Command.NOP; case UPGRADED: return Command.STOP; case STOPPED: http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/d850dd28/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 3d062b5..fd9253e 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -1818,10 +1818,42 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * Shutdown operation: release all containers */ private void releaseAllContainers() { - List<AbstractRMOperation> operations = appState.releaseAllContainers(); - providerRMOperationHandler.execute(operations); - //now apply the operations - execute(operations); + if (providerService instanceof AgentProviderService) { + log.info("Setting stopInitiated flag to true"); + AgentProviderService agentProviderService = (AgentProviderService) providerService; + agentProviderService.setAppStopInitiated(true); + } + // Add the sleep here (before releasing containers) so that applications get + // time to perform graceful shutdown + try { + long timeout = getContainerReleaseTimeout(); + if (timeout > 0) { + Thread.sleep(timeout); + } + } catch (InterruptedException e) { + log.info("Sleep for container release interrupted"); + } finally { + List<AbstractRMOperation> operations = appState.releaseAllContainers(); + providerRMOperationHandler.execute(operations); + // now apply the operations + execute(operations); + } + } + + private long getContainerReleaseTimeout() { + // Get container release timeout in millis or 0 if the property is not set. + // If non-zero then add the agent heartbeat delay time, since it can take up + // to that much time for agents to receive the stop command. + int timeout = getInstanceDefinition().getAppConfOperations() + .getGlobalOptions() + .getOptionInt(SliderKeys.APP_CONTAINER_RELEASE_TIMEOUT, 0); + if (timeout > 0) { + timeout += SliderKeys.APP_CONTAINER_HEARTBEAT_INTERVAL_SEC; + } + // convert to millis + long timeoutInMillis = timeout * 1000l; + log.info("Container release timeout in millis = {}", timeoutInMillis); + return timeoutInMillis; } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/d850dd28/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java index 9e1c135..0f31d73 100644 --- a/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java +++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestAgentProviderService.java @@ -1718,6 +1718,93 @@ public class TestAgentProviderService { } @Test + public void testAddStopCommand() throws Exception { + AgentProviderService aps = createAgentProviderService(new Configuration()); + HeartBeatResponse hbr = new HeartBeatResponse(); + + StateAccessForProviders access = createNiceMock(StateAccessForProviders.class); + AgentProviderService mockAps = Mockito.spy(aps); + doReturn(access).when(mockAps).getAmState(); + + AggregateConf aggConf = new AggregateConf(); + ConfTreeOperations treeOps = aggConf.getAppConfOperations(); + treeOps.getGlobalOptions().put(AgentKeys.JAVA_HOME, "java_home"); + treeOps.set(OptionKeys.APPLICATION_NAME, "HBASE"); + treeOps.set("site.fs.defaultFS", "hdfs://HOST1:8020/"); + treeOps.set("internal.data.dir.path", "hdfs://HOST1:8020/database"); + treeOps.set(OptionKeys.ZOOKEEPER_HOSTS, "HOST1"); + treeOps.getGlobalOptions().put("site.hbase-site.a.port", "${HBASE_MASTER.ALLOCATED_PORT}"); + treeOps.getGlobalOptions().put("site.hbase-site.b.port", "${HBASE_MASTER.ALLOCATED_PORT}"); + treeOps.getGlobalOptions().put("site.hbase-site.random.port", "${HBASE_MASTER.ALLOCATED_PORT}{PER_CONTAINER}"); + treeOps.getGlobalOptions().put("site.hbase-site.random2.port", "${HBASE_MASTER.ALLOCATED_PORT}"); + + Map<String, DefaultConfig> defaultConfigMap = new HashMap<String, DefaultConfig>(); + DefaultConfig defaultConfig = new DefaultConfig(); + PropertyInfo propertyInfo1 = new PropertyInfo(); + propertyInfo1.setName("defaultA"); + propertyInfo1.setValue("Avalue"); + defaultConfig.addPropertyInfo(propertyInfo1); + propertyInfo1 = new PropertyInfo(); + propertyInfo1.setName("defaultB"); + propertyInfo1.setValue(""); + defaultConfig.addPropertyInfo(propertyInfo1); + defaultConfigMap.put("hbase-site", defaultConfig); + + expect(access.getAppConfSnapshot()).andReturn(treeOps).anyTimes(); + expect(access.getInternalsSnapshot()).andReturn(treeOps).anyTimes(); + expect(access.isApplicationLive()).andReturn(true).anyTimes(); + + doReturn("HOST1").when(mockAps).getClusterInfoPropertyValue(anyString()); + doReturn(defaultConfigMap).when(mockAps).getDefaultConfigs(); + List<String> configurations = new ArrayList<String>(); + configurations.add("hbase-site"); + configurations.add("global"); + List<String> sysConfigurations = new ArrayList<String>(); + configurations.add("core-site"); + doReturn(configurations).when(mockAps).getApplicationConfigurationTypes(); + doReturn(sysConfigurations).when(mockAps).getSystemConfigurationsRequested(any(ConfTreeOperations.class)); + + Map<String, Map<String, ClusterNode>> roleClusterNodeMap = new HashMap<String, Map<String, ClusterNode>>(); + Map<String, ClusterNode> container = new HashMap<String, ClusterNode>(); + ClusterNode cn1 = new ClusterNode(new MockContainerId(1)); + cn1.host = "HOST1"; + container.put("cid1", cn1); + roleClusterNodeMap.put("HBASE_MASTER", container); + doReturn(roleClusterNodeMap).when(mockAps).getRoleClusterNodeMapping(); + Map<String, String> allocatedPorts = new HashMap<String, String>(); + allocatedPorts.put("hbase-site.a.port", "10023"); + allocatedPorts.put("hbase-site.b.port", "10024"); + doReturn(allocatedPorts).when(mockAps).getAllocatedPorts(); + Map<String, String> allocatedPorts2 = new HashMap<String, String>(); + allocatedPorts2.put("hbase-site.random.port", "10025"); + doReturn(allocatedPorts2).when(mockAps).getAllocatedPorts(anyString()); + + replay(access); + + mockAps.addStopCommand("HBASE_MASTER", "cid1", hbr, "/tmp/stop_cmd.sh", 10, false); + + Assert.assertTrue(hbr.getExecutionCommands().get(0).getConfigurations().containsKey("hbase-site")); + Assert.assertTrue(hbr.getExecutionCommands().get(0).getConfigurations().containsKey("core-site")); + Map<String, String> hbaseSiteConf = hbr.getExecutionCommands().get(0).getConfigurations().get("hbase-site"); + Assert.assertTrue(hbaseSiteConf.containsKey("a.port")); + Assert.assertEquals("10023", hbaseSiteConf.get("a.port")); + Assert.assertEquals("10024", hbaseSiteConf.get("b.port")); + Assert.assertEquals("10025", hbaseSiteConf.get("random.port")); + assertEquals("${HBASE_MASTER.ALLOCATED_PORT}", + hbaseSiteConf.get("random2.port")); + ExecutionCommand cmd = hbr.getExecutionCommands().get(0); + Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("app_log_dir")); + Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("app_pid_dir")); + Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("app_install_dir")); + Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("app_input_conf_dir")); + Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("app_container_id")); + Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("pid_file")); + Assert.assertTrue(cmd.getConfigurations().get("global").containsKey("app_root")); + Assert.assertTrue(cmd.getConfigurations().get("hbase-site").containsKey("defaultA")); + Assert.assertFalse(cmd.getConfigurations().get("hbase-site").containsKey("defaultB")); + } + + @Test public void testParameterParsing() throws IOException { AgentProviderService aps = createAgentProviderService(new Configuration()); AggregateConf aggConf = new AggregateConf(); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/d850dd28/slider-core/src/test/java/org/apache/slider/providers/agent/TestState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/test/java/org/apache/slider/providers/agent/TestState.java b/slider-core/src/test/java/org/apache/slider/providers/agent/TestState.java new file mode 100644 index 0000000..6a2e5ab --- /dev/null +++ b/slider-core/src/test/java/org/apache/slider/providers/agent/TestState.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.slider.providers.agent; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestState { + protected static final Logger log = LoggerFactory.getLogger(TestState.class); + + @Test + public void testState() throws Exception { + State state = State.STARTED; + Assert.assertEquals(Command.STOP, state.getSupportedCommand(false, true)); + } +}
