Repository: incubator-slider Updated Branches: refs/heads/develop 566d1206a -> 7ae7d1532
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index d95230f..3ee71f2 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -165,6 +165,8 @@ public class AgentProviderService extends AbstractProviderService implements private Boolean canAnyMasterPublish = null; private AgentLaunchParameter agentLaunchParameter = null; private String clusterName = null; + private boolean isInUpgradeMode; + private Set<String> upgradeContainers = new HashSet<String>(); private final Map<String, ComponentInstanceState> componentStatuses = new ConcurrentHashMap<String, ComponentInstanceState>(); @@ -714,6 +716,10 @@ public class AgentProviderService extends AbstractProviderService implements String label = heartBeat.getHostname(); String roleName = getRoleName(label); String containerId = getContainerId(label); + boolean doUpgrade = false; + if (isInUpgradeMode && upgradeContainers.contains(containerId)) { + doUpgrade = true; + } StateAccessForProviders accessor = getAmState(); CommandScript cmdScript = getScriptPathFromMetainfo(roleName); @@ -748,6 +754,23 @@ public class AgentProviderService extends AbstractProviderService implements Boolean isMaster = isMaster(roleName); ComponentInstanceState componentStatus = getComponentStatuses().get(label); componentStatus.heartbeat(System.currentTimeMillis()); + if (doUpgrade) { + switch (componentStatus.getState()) { + case STARTED: + componentStatus.setTargetState(State.UPGRADED); + break; + case UPGRADED: + componentStatus.setTargetState(State.STOPPED); + break; + case STOPPED: + componentStatus.setTargetState(State.TERMINATING); + break; + default: + break; + } + log.info("Current state = {} target state {}", + componentStatus.getState(), componentStatus.getTargetState()); + } publishConfigAndExportGroups(heartBeat, componentStatus, roleName); @@ -761,8 +784,9 @@ public class AgentProviderService extends AbstractProviderService implements CommandResult result = CommandResult.getCommandResult(report.getStatus()); Command command = Command.getCommand(report.getRoleCommand()); componentStatus.applyCommandResult(result, command); - log.info("Component operation. Status: {}; new container state: {}", - result, componentStatus.getContainerState()); + log.info("Component operation. Status: {}; new container state: {};" + + " new component state: {}", result, + componentStatus.getContainerState(), componentStatus.getState()); if (command == Command.INSTALL && SliderUtils.isNotEmpty(report.getFolders())) { publishFolderPaths(report.getFolders(), containerId, roleName, heartBeat.getFqdn()); @@ -778,7 +802,7 @@ public class AgentProviderService extends AbstractProviderService implements return response; } - Command command = componentStatus.getNextCommand(); + Command command = componentStatus.getNextCommand(doUpgrade); try { if (Command.NOP != command) { if (command == Command.INSTALL) { @@ -830,6 +854,18 @@ public class AgentProviderService extends AbstractProviderService implements } else { log.info("Start of {} on {} delayed as dependencies have not started.", roleName, containerId); } + } else if (command == Command.UPGRADE) { + addUpgradeCommand(roleName, containerId, response, scriptPath, + timeout); + componentStatus.commandIssued(command, true); + } else if (command == Command.STOP) { + addStopCommand(roleName, containerId, response, scriptPath, timeout, + doUpgrade); + componentStatus.commandIssued(command); + } else if (command == Command.TERMINATE) { + log.info("A formal terminate command is being sent to container {}" + + " in state {}", label, componentStatus.getState()); + response.setTerminateAgent(true); } } @@ -1071,6 +1107,14 @@ public class AgentProviderService extends AbstractProviderService implements this.heartbeatMonitorInterval = heartbeatMonitorInterval; } + public void setInUpgradeMode(boolean inUpgradeMode) { + this.isInUpgradeMode = inUpgradeMode; + } + + public void addUpgradeContainers(List<String> upgradeContainers) { + this.upgradeContainers.addAll(upgradeContainers); + } + /** * Read all default configs * @@ -1929,6 +1973,72 @@ public class AgentProviderService extends AbstractProviderService implements response.addExecutionCommand(cmdStop); } + @VisibleForTesting + protected void addUpgradeCommand(String componentName, String containerId, + HeartBeatResponse response, String scriptPath, long timeout) + throws SliderException { + assert getAmState().isApplicationLive(); + ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); + ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot(); + + ExecutionCommand cmd = new ExecutionCommand( + AgentCommandType.EXECUTION_COMMAND); + prepareExecutionCommand(cmd); + String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME); + String hostName = getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME); + cmd.setHostname(hostName); + cmd.setClusterName(clusterName); + cmd.setRoleCommand(Command.UPGRADE.toString()); + cmd.setServiceName(clusterName); + cmd.setComponentName(componentName); + cmd.setRole(componentName); + Map<String, String> hostLevelParams = new TreeMap<String, String>(); + hostLevelParams.put(JAVA_HOME, appConf.getGlobalOptions() + .getMandatoryOption(JAVA_HOME)); + hostLevelParams.put(CONTAINER_ID, containerId); + cmd.setHostLevelParams(hostLevelParams); + cmd.setCommandParams(commandParametersSet(scriptPath, timeout, true)); + + Map<String, Map<String, String>> configurations = buildCommandConfigurations( + appConf, containerId, componentName); + cmd.setConfigurations(configurations); + response.addExecutionCommand(cmd); + } + + protected void addStopCommand(String componentName, String containerId, + HeartBeatResponse response, String scriptPath, long timeout, + boolean isInUpgradeMode) throws SliderException { + assert getAmState().isApplicationLive(); + ConfTreeOperations appConf = getAmState().getAppConfSnapshot(); + ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot(); + + ExecutionCommand cmdStop = new ExecutionCommand( + AgentCommandType.EXECUTION_COMMAND); + cmdStop.setTaskId(taskId.get()); + cmdStop.setCommandId(cmdStop.getTaskId() + "-1"); + String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME); + String hostName = getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME); + cmdStop.setHostname(hostName); + cmdStop.setClusterName(clusterName); + // Upgrade stop is differentiated by passing a transformed role command - + // UPGRADE_STOP + cmdStop.setRoleCommand(Command.transform(Command.STOP, isInUpgradeMode)); + cmdStop.setServiceName(clusterName); + cmdStop.setComponentName(componentName); + cmdStop.setRole(componentName); + Map<String, String> hostLevelParamsStop = new TreeMap<String, String>(); + hostLevelParamsStop.put(JAVA_HOME, appConf.getGlobalOptions() + .getMandatoryOption(JAVA_HOME)); + hostLevelParamsStop.put(CONTAINER_ID, containerId); + cmdStop.setHostLevelParams(hostLevelParamsStop); + cmdStop.setCommandParams(commandParametersSet(scriptPath, timeout, true)); + + Map<String, Map<String, String>> configurationsStop = buildCommandConfigurations( + appConf, containerId, componentName); + cmdStop.setConfigurations(configurationsStop); + response.addExecutionCommand(cmdStop); + } + protected static String getJDKDir() { File javaHome = new File(System.getProperty("java.home")).getParentFile(); File jdkDirectory = null; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java b/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java index a851803..7d13a8f 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java @@ -23,7 +23,9 @@ public enum Command { NOP, // do nothing INSTALL, // Install the component START, // Start the component - STOP; // Stop the component + STOP, // Stop the component + UPGRADE, // The component will undergo upgrade + TERMINATE;// Send terminate signal to agent public static Command getCommand(String commandVal) { if (commandVal.equals(Command.START.toString())) { @@ -35,7 +37,22 @@ public enum Command { if (commandVal.equals(Command.STOP.toString())) { return Command.STOP; } + if (commandVal.equals(Command.UPGRADE.toString())) { + return Command.UPGRADE; + } + if (commandVal.equals(Command.TERMINATE.toString())) { + return Command.TERMINATE; + } return Command.NOP; } + + public static String transform(Command command, boolean isUpgrade) { + switch (command) { + case STOP: + return isUpgrade ? "UPGRADE_STOP" : command.name(); + default: + return command.name(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java b/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java index a50f3c0..dd78278 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java @@ -98,7 +98,11 @@ public class ComponentInstanceState { } public void commandIssued(Command command) { - Command expected = getNextCommand(); + commandIssued(command, false); + } + + public void commandIssued(Command command, boolean isInUpgradeMode) { + Command expected = getNextCommand(isInUpgradeMode); if (expected != command) { throw new IllegalArgumentException("Command " + command + " is not allowed in state " + state); } @@ -139,11 +143,15 @@ public class ComponentInstanceState { } public Command getNextCommand() { + return getNextCommand(false); + } + + public Command getNextCommand(boolean isInUpgradeMode) { if (!hasPendingCommand()) { return Command.NOP; } - return this.state.getSupportedCommand(); + return this.state.getSupportedCommand(isInUpgradeMode); } public State getState() { @@ -155,6 +163,14 @@ public class ComponentInstanceState { this.state = state; } + public State getTargetState() { + return targetState; + } + + public void setTargetState(State targetState) { + this.targetState = targetState; + } + @Override public int hashCode() { int hashCode = 1; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/providers/agent/State.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/State.java b/slider-core/src/main/java/org/apache/slider/providers/agent/State.java index 09732a5..0738740 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/State.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/State.java @@ -25,7 +25,13 @@ public enum State { INSTALLED, // Installed (or stopped) STARTING, // Starting STARTED, // Started - INSTALL_FAILED; // Install failed, start failure in INSTALLED + INSTALL_FAILED, // Install failed, start failure in INSTALLED + UPGRADING, // Undergoing upgrade, perform necessary pre-upgrade steps + UPGRADED, // Pre-upgrade steps completed + STOPPING, // Stop has been issued + STOPPED, // Agent has stopped + TERMINATING; // Terminate signal to ask the agent to kill itself + // No need for state TERMINATED (as the agent is dead by then) /** * Indicates whether or not it is a valid state to produce a command. @@ -36,7 +42,9 @@ public enum State { switch (this) { case INSTALLING: case STARTING: - case STARTED: + case UPGRADING: + case STOPPING: + case TERMINATING: return false; default: return true; @@ -49,12 +57,22 @@ public enum State { * @return command allowed in this state. */ public Command getSupportedCommand() { + return getSupportedCommand(false); + } + + public Command getSupportedCommand(boolean isInUpgradeMode) { switch (this) { case INIT: case INSTALL_FAILED: return Command.INSTALL; case INSTALLED: return Command.START; + case STARTED: + return isInUpgradeMode ? Command.UPGRADE : Command.NOP; + case UPGRADED: + return Command.STOP; + case STOPPED: + return Command.TERMINATE; default: return Command.NOP; } @@ -68,7 +86,9 @@ public enum State { public State getNextState(CommandResult result) throws IllegalArgumentException { switch (result) { case IN_PROGRESS: - if (this == State.INSTALLING || this == State.STARTING) { + if (this == State.INSTALLING || this == State.STARTING + || this == State.UPGRADING || this == State.STOPPING + || this == State.TERMINATING) { return this; } else { throw new IllegalArgumentException(result + " is not valid for " + this); @@ -78,6 +98,12 @@ public enum State { return State.INSTALLED; } else if (this == State.STARTING) { return State.STARTED; + } else if (this == State.UPGRADING) { + return State.UPGRADED; + } else if (this == State.STOPPING) { + return State.STOPPED; + } else if (this == State.STOPPED) { + return State.TERMINATING; } else { throw new IllegalArgumentException(result + " is not valid for " + this); } @@ -86,6 +112,16 @@ public enum State { return State.INSTALL_FAILED; } else if (this == State.STARTING) { return State.INSTALLED; + } else if (this == State.UPGRADING) { + // if pre-upgrade failed, force stop now, so mark it upgraded + // what other options can be exposed to app owner? + return State.UPGRADED; + } else if (this == State.STOPPING) { + // if stop fails, force mark it stopped (and let container terminate) + return State.STOPPED; + } else if (this == State.STOPPED) { + // if in stopped state, force mark it as terminating + return State.TERMINATING; } else { throw new IllegalArgumentException(result + " is not valid for " + this); } @@ -113,6 +149,24 @@ public enum State { } else { throw new IllegalArgumentException(command + " is not valid for " + this); } + case UPGRADE: + if (this == State.STARTED) { + return State.UPGRADING; + } else { + throw new IllegalArgumentException(command + " is not valid for " + this); + } + case STOP: + if (this == State.STARTED || this == State.UPGRADED) { + return State.STOPPING; + } else { + throw new IllegalArgumentException(command + " is not valid for " + this); + } + case TERMINATE: + if (this == State.STOPPED) { + return State.TERMINATING; + } else { + throw new IllegalArgumentException(command + " is not valid for " + this); + } case NOP: return this; default: @@ -121,8 +175,13 @@ public enum State { } public boolean couldHaveIssued(Command command) { - if ((this == State.INSTALLING && command == Command.INSTALL) || - (this == State.STARTING && command == Command.START)) { + if ((this == State.INSTALLING && command == Command.INSTALL) + || (this == State.STARTING && command == Command.START) + || (this == State.UPGRADING && command == Command.UPGRADE) + || (this == State.STOPPING + && (command == Command.STOP || command == Command.NOP)) + || (this == State.TERMINATING && command == Command.TERMINATE) + ) { return true; } return false; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 8a52043..6ef5f8e 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -22,6 +22,8 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.health.HealthCheckRegistry; import com.google.common.base.Preconditions; import com.google.protobuf.BlockingService; + +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; @@ -110,6 +112,7 @@ import org.apache.slider.providers.ProviderRole; import org.apache.slider.providers.ProviderService; import org.apache.slider.providers.SliderProviderFactory; import org.apache.slider.providers.agent.AgentKeys; +import org.apache.slider.providers.agent.AgentProviderService; import org.apache.slider.providers.slideram.SliderAMClientProvider; import org.apache.slider.providers.slideram.SliderAMProviderService; import org.apache.slider.server.appmaster.actions.ActionRegisterServiceInstance; @@ -118,6 +121,7 @@ import org.apache.slider.server.appmaster.actions.RegisterComponentInstance; import org.apache.slider.server.appmaster.actions.QueueExecutor; import org.apache.slider.server.appmaster.actions.QueueService; import org.apache.slider.server.appmaster.actions.ActionStopSlider; +import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers; import org.apache.slider.server.appmaster.actions.AsyncAction; import org.apache.slider.server.appmaster.actions.RenewingAction; import org.apache.slider.server.appmaster.actions.ResetFailureWindow; @@ -141,7 +145,6 @@ import org.apache.slider.server.appmaster.state.ContainerAssignment; import org.apache.slider.server.appmaster.state.ProviderAppState; import org.apache.slider.server.appmaster.operations.RMOperationHandler; import org.apache.slider.server.appmaster.state.RoleInstance; -import org.apache.slider.server.appmaster.state.RoleStatus; import org.apache.slider.server.appmaster.web.AgentService; import org.apache.slider.server.appmaster.web.rest.InsecureAmFilterInitializer; import org.apache.slider.server.appmaster.web.rest.agent.AgentWebApp; @@ -161,7 +164,9 @@ import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProvide import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; import java.io.File; +import java.io.FileReader; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; @@ -1617,6 +1622,58 @@ public class SliderAppMaster extends AbstractSliderLaunchedService } /** + * Signal that containers are being upgraded + * + * @param upgradeContainersRequest + * request containing upgrade details + */ + public synchronized void onUpgradeContainers( + ActionUpgradeContainers upgradeContainersRequest) throws IOException, + SliderException { + LOG_YARN.info("onUpgradeContainers([{}]", + upgradeContainersRequest.getMessage()); + List<String> containers = upgradeContainersRequest.getContainers(); + if (CollectionUtils.isEmpty(containers)) { + // components will not be null here, since it is pre-checked + List<String> components = upgradeContainersRequest.getComponents(); + Map<ContainerId, RoleInstance> liveContainers = appState.getLiveNodes(); + containers = new ArrayList<String>(); + Map<String, List<String>> roleContainerMap = prepareRoleContainerMap(liveContainers); + for (String component : components) { + List<String> roleContainers = roleContainerMap.get(component); + if (roleContainers != null) { + containers.addAll(roleContainers); + } + } + } + LOG_YARN.info("Containers to be upgraded (total {}) : {}", containers.size(), + containers); + if (providerService instanceof AgentProviderService) { + AgentProviderService agentProviderService = (AgentProviderService) providerService; + agentProviderService.setInUpgradeMode(true); + agentProviderService.addUpgradeContainers(containers); + } + } + + // create a reverse map of roles -> list of all live containers + private Map<String, List<String>> prepareRoleContainerMap( + Map<ContainerId, RoleInstance> liveContainers) { + Map<String, List<String>> roleContainerMap = new HashMap<String, List<String>>(); + for (Map.Entry<ContainerId, RoleInstance> liveContainer : liveContainers + .entrySet()) { + RoleInstance role = liveContainer.getValue(); + if (roleContainerMap.containsKey(role.role)) { + roleContainerMap.get(role.role).add(liveContainer.getKey().toString()); + } else { + List<String> containers = new ArrayList<String>(); + containers.add(liveContainer.getKey().toString()); + roleContainerMap.put(role.role, containers); + } + } + return roleContainerMap; + } + + /** * Implementation of cluster flexing. * It should be the only way that anything -even the AM itself on startup- * asks for nodes. http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionUpgradeContainers.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionUpgradeContainers.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionUpgradeContainers.java new file mode 100644 index 0000000..ad3bb92 --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionUpgradeContainers.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.actions; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.slider.server.appmaster.SliderAppMaster; +import org.apache.slider.server.appmaster.state.AppState; + +public class ActionUpgradeContainers extends AsyncAction { + private int exitCode; + private FinalApplicationStatus finalApplicationStatus; + private String message; + private List<String> containers; + private List<String> components; + + public ActionUpgradeContainers(String name, + long delay, + TimeUnit timeUnit, + int exitCode, + FinalApplicationStatus finalApplicationStatus, + List<String> containers, + List<String> components, + String message) { + super(name, delay, timeUnit); + this.exitCode = exitCode; + this.finalApplicationStatus = finalApplicationStatus; + this.containers = containers; + this.components = components; + this.message = message; + } + + @Override + public void execute(SliderAppMaster appMaster, QueueAccess queueService, + AppState appState) throws Exception { + if (CollectionUtils.isNotEmpty(this.containers) + || CollectionUtils.isNotEmpty(this.components)) { + SliderAppMaster.getLog().info("SliderAppMaster.upgradeContainers: {}", + message); + appMaster.onUpgradeContainers(this); + } + } + + public int getExitCode() { + return exitCode; + } + + public void setExitCode(int exitCode) { + this.exitCode = exitCode; + } + + public FinalApplicationStatus getFinalApplicationStatus() { + return finalApplicationStatus; + } + + public void setFinalApplicationStatus( + FinalApplicationStatus finalApplicationStatus) { + this.finalApplicationStatus = finalApplicationStatus; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public List<String> getContainers() { + return containers; + } + + public void setContainers(List<String> containers) { + this.containers = containers; + } + + public List<String> getComponents() { + return components; + } + + public void setComponents(List<String> components) { + this.components = components; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java index 14b2bef..c597626 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java @@ -61,6 +61,17 @@ public class SliderClusterProtocolPBImpl implements SliderClusterProtocolPB { } @Override + public Messages.UpgradeContainersResponseProto upgradeContainers(RpcController controller, + Messages.UpgradeContainersRequestProto request) throws + ServiceException { + try { + return real.upgradeContainers(request); + } catch (Exception e) { + throw wrap(e); + } + } + + @Override public Messages.FlexClusterResponseProto flexCluster(RpcController controller, Messages.FlexClusterRequestProto request) throws ServiceException { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java index ad4cca4..2d927c6 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java @@ -100,6 +100,17 @@ public class SliderClusterProtocolProxy implements SliderClusterProtocol { } @Override + public Messages.UpgradeContainersResponseProto upgradeContainers( + Messages.UpgradeContainersRequestProto request) throws IOException, + YarnException { + try { + return endpoint.upgradeContainers(NULL_CONTROLLER, request); + } catch (ServiceException e) { + throw convert(e); + } + } + + @Override public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request) throws IOException { try { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java index d5822e9..7e25cd0 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java @@ -43,6 +43,7 @@ import org.apache.slider.server.appmaster.actions.ActionFlexCluster; import org.apache.slider.server.appmaster.actions.ActionHalt; import org.apache.slider.server.appmaster.actions.ActionKillContainer; import org.apache.slider.server.appmaster.actions.ActionStopSlider; +import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers; import org.apache.slider.server.appmaster.actions.AsyncAction; import org.apache.slider.server.appmaster.actions.QueueAccess; import org.apache.slider.server.appmaster.management.MetricsAndMonitoring; @@ -188,6 +189,29 @@ public class SliderIPCService extends AbstractService } @Override //SliderClusterProtocol + public Messages.UpgradeContainersResponseProto upgradeContainers( + Messages.UpgradeContainersRequestProto request) throws IOException, + YarnException { + onRpcCall("upgrade"); + String message = request.getMessage(); + if (message == null) { + message = "application containers upgraded by client"; + } + ActionUpgradeContainers upgradeContainers = + new ActionUpgradeContainers( + "Upgrade containers", + 1000, TimeUnit.MILLISECONDS, + LauncherExitCodes.EXIT_SUCCESS, + FinalApplicationStatus.SUCCEEDED, + request.getContainerList(), + request.getComponentList(), + message); + log.info("SliderAppMasterApi.upgradeContainers: {}", upgradeContainers); + schedule(upgradeContainers); + return Messages.UpgradeContainersResponseProto.getDefaultInstance(); + } + + @Override //SliderClusterProtocol public Messages.FlexClusterResponseProto flexCluster(Messages.FlexClusterRequestProto request) throws IOException { onRpcCall("flex"); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/proto/SliderClusterMessages.proto ---------------------------------------------------------------------- diff --git a/slider-core/src/main/proto/SliderClusterMessages.proto b/slider-core/src/main/proto/SliderClusterMessages.proto index c2eba89..5e770d5 100644 --- a/slider-core/src/main/proto/SliderClusterMessages.proto +++ b/slider-core/src/main/proto/SliderClusterMessages.proto @@ -51,6 +51,7 @@ message StopClusterRequestProto { */ required string message = 1; } + /** * stop the cluster */ @@ -58,6 +59,24 @@ message StopClusterResponseProto { } /** + * upgrade the containers + */ +message UpgradeContainersRequestProto { + /** + message to include + */ + required string message = 1; + repeated string container = 2; + repeated string component = 3; +} + +/** + * upgrade the containers + */ +message UpgradeContainersResponseProto { +} + +/** * flex the cluster */ message FlexClusterRequestProto { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/proto/SliderClusterProtocol.proto ---------------------------------------------------------------------- diff --git a/slider-core/src/main/proto/SliderClusterProtocol.proto b/slider-core/src/main/proto/SliderClusterProtocol.proto index d2ba723..aa59bb4 100644 --- a/slider-core/src/main/proto/SliderClusterProtocol.proto +++ b/slider-core/src/main/proto/SliderClusterProtocol.proto @@ -56,6 +56,12 @@ service SliderClusterProtocolPB { returns(StopClusterResponseProto); /** + * Upgrade containers + */ + rpc upgradeContainers(UpgradeContainersRequestProto) + returns(UpgradeContainersResponseProto); + + /** * Flex the cluster. */ rpc flexCluster(FlexClusterRequestProto)
