Repository: incubator-slider
Updated Branches:
  refs/heads/develop 566d1206a -> 7ae7d1532


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index d95230f..3ee71f2 100644
--- 
a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ 
b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -165,6 +165,8 @@ public class AgentProviderService extends 
AbstractProviderService implements
   private Boolean canAnyMasterPublish = null;
   private AgentLaunchParameter agentLaunchParameter = null;
   private String clusterName = null;
+  private boolean isInUpgradeMode;
+  private Set<String> upgradeContainers = new HashSet<String>();
 
   private final Map<String, ComponentInstanceState> componentStatuses =
       new ConcurrentHashMap<String, ComponentInstanceState>();
@@ -714,6 +716,10 @@ public class AgentProviderService extends 
AbstractProviderService implements
     String label = heartBeat.getHostname();
     String roleName = getRoleName(label);
     String containerId = getContainerId(label);
+    boolean doUpgrade = false;
+    if (isInUpgradeMode && upgradeContainers.contains(containerId)) {
+      doUpgrade = true;
+    }
 
     StateAccessForProviders accessor = getAmState();
     CommandScript cmdScript = getScriptPathFromMetainfo(roleName);
@@ -748,6 +754,23 @@ public class AgentProviderService extends 
AbstractProviderService implements
     Boolean isMaster = isMaster(roleName);
     ComponentInstanceState componentStatus = getComponentStatuses().get(label);
     componentStatus.heartbeat(System.currentTimeMillis());
+    if (doUpgrade) {
+      switch (componentStatus.getState()) {
+      case STARTED:
+        componentStatus.setTargetState(State.UPGRADED);
+        break;
+      case UPGRADED:
+        componentStatus.setTargetState(State.STOPPED);
+        break;
+      case STOPPED:
+        componentStatus.setTargetState(State.TERMINATING);
+        break;
+      default:
+        break;
+      }
+      log.info("Current state = {} target state {}",
+          componentStatus.getState(), componentStatus.getTargetState());
+    }
 
     publishConfigAndExportGroups(heartBeat, componentStatus, roleName);
 
@@ -761,8 +784,9 @@ public class AgentProviderService extends 
AbstractProviderService implements
       CommandResult result = 
CommandResult.getCommandResult(report.getStatus());
       Command command = Command.getCommand(report.getRoleCommand());
       componentStatus.applyCommandResult(result, command);
-      log.info("Component operation. Status: {}; new container state: {}",
-          result, componentStatus.getContainerState());
+      log.info("Component operation. Status: {}; new container state: {};"
+          + " new component state: {}", result,
+          componentStatus.getContainerState(), componentStatus.getState());
 
       if (command == Command.INSTALL && 
SliderUtils.isNotEmpty(report.getFolders())) {
         publishFolderPaths(report.getFolders(), containerId, roleName, 
heartBeat.getFqdn());
@@ -778,7 +802,7 @@ public class AgentProviderService extends 
AbstractProviderService implements
       return response;
     }
 
-    Command command = componentStatus.getNextCommand();
+    Command command = componentStatus.getNextCommand(doUpgrade);
     try {
       if (Command.NOP != command) {
         if (command == Command.INSTALL) {
@@ -830,6 +854,18 @@ public class AgentProviderService extends 
AbstractProviderService implements
           } else {
             log.info("Start of {} on {} delayed as dependencies have not 
started.", roleName, containerId);
           }
+        } else if (command == Command.UPGRADE) {
+          addUpgradeCommand(roleName, containerId, response, scriptPath,
+              timeout);
+          componentStatus.commandIssued(command, true);
+        } else if (command == Command.STOP) {
+          addStopCommand(roleName, containerId, response, scriptPath, timeout,
+              doUpgrade);
+          componentStatus.commandIssued(command);
+        } else if (command == Command.TERMINATE) {
+          log.info("A formal terminate command is being sent to container {}"
+              + " in state {}", label, componentStatus.getState());
+          response.setTerminateAgent(true);
         }
       }
 
@@ -1071,6 +1107,14 @@ public class AgentProviderService extends 
AbstractProviderService implements
     this.heartbeatMonitorInterval = heartbeatMonitorInterval;
   }
 
+  public void setInUpgradeMode(boolean inUpgradeMode) {
+    this.isInUpgradeMode = inUpgradeMode;
+  }
+
+  public void addUpgradeContainers(List<String> upgradeContainers) {
+    this.upgradeContainers.addAll(upgradeContainers);
+  }
+
   /**
    * Read all default configs
    *
@@ -1929,6 +1973,72 @@ public class AgentProviderService extends 
AbstractProviderService implements
     response.addExecutionCommand(cmdStop);
   }
 
+  @VisibleForTesting
+  protected void addUpgradeCommand(String componentName, String containerId,
+      HeartBeatResponse response, String scriptPath, long timeout)
+      throws SliderException {
+    assert getAmState().isApplicationLive();
+    ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
+    ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot();
+
+    ExecutionCommand cmd = new ExecutionCommand(
+        AgentCommandType.EXECUTION_COMMAND);
+    prepareExecutionCommand(cmd);
+    String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME);
+    String hostName = getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME);
+    cmd.setHostname(hostName);
+    cmd.setClusterName(clusterName);
+    cmd.setRoleCommand(Command.UPGRADE.toString());
+    cmd.setServiceName(clusterName);
+    cmd.setComponentName(componentName);
+    cmd.setRole(componentName);
+    Map<String, String> hostLevelParams = new TreeMap<String, String>();
+    hostLevelParams.put(JAVA_HOME, appConf.getGlobalOptions()
+        .getMandatoryOption(JAVA_HOME));
+    hostLevelParams.put(CONTAINER_ID, containerId);
+    cmd.setHostLevelParams(hostLevelParams);
+    cmd.setCommandParams(commandParametersSet(scriptPath, timeout, true));
+
+    Map<String, Map<String, String>> configurations = 
buildCommandConfigurations(
+        appConf, containerId, componentName);
+    cmd.setConfigurations(configurations);
+    response.addExecutionCommand(cmd);
+  }
+    
+  protected void addStopCommand(String componentName, String containerId,
+      HeartBeatResponse response, String scriptPath, long timeout,
+      boolean isInUpgradeMode) throws SliderException {
+    assert getAmState().isApplicationLive();
+    ConfTreeOperations appConf = getAmState().getAppConfSnapshot();
+    ConfTreeOperations internalsConf = getAmState().getInternalsSnapshot();
+
+    ExecutionCommand cmdStop = new ExecutionCommand(
+        AgentCommandType.EXECUTION_COMMAND);
+    cmdStop.setTaskId(taskId.get());
+    cmdStop.setCommandId(cmdStop.getTaskId() + "-1");
+    String clusterName = internalsConf.get(OptionKeys.APPLICATION_NAME);
+    String hostName = getClusterInfoPropertyValue(StatusKeys.INFO_AM_HOSTNAME);
+    cmdStop.setHostname(hostName);
+    cmdStop.setClusterName(clusterName);
+    // Upgrade stop is differentiated by passing a transformed role command -
+    // UPGRADE_STOP
+    cmdStop.setRoleCommand(Command.transform(Command.STOP, isInUpgradeMode));
+    cmdStop.setServiceName(clusterName);
+    cmdStop.setComponentName(componentName);
+    cmdStop.setRole(componentName);
+    Map<String, String> hostLevelParamsStop = new TreeMap<String, String>();
+    hostLevelParamsStop.put(JAVA_HOME, appConf.getGlobalOptions()
+        .getMandatoryOption(JAVA_HOME));
+    hostLevelParamsStop.put(CONTAINER_ID, containerId);
+    cmdStop.setHostLevelParams(hostLevelParamsStop);
+    cmdStop.setCommandParams(commandParametersSet(scriptPath, timeout, true));
+
+    Map<String, Map<String, String>> configurationsStop = 
buildCommandConfigurations(
+        appConf, containerId, componentName);
+    cmdStop.setConfigurations(configurationsStop);
+    response.addExecutionCommand(cmdStop);
+  }
+
   protected static String getJDKDir() {
     File javaHome = new File(System.getProperty("java.home")).getParentFile();
     File jdkDirectory = null;

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java 
b/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java
index a851803..7d13a8f 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/Command.java
@@ -23,7 +23,9 @@ public enum Command {
   NOP,      // do nothing
   INSTALL,  // Install the component
   START,    // Start the component
-  STOP;     // Stop the component
+  STOP,     // Stop the component
+  UPGRADE,  // The component will undergo upgrade
+  TERMINATE;// Send terminate signal to agent
 
   public static Command getCommand(String commandVal) {
     if (commandVal.equals(Command.START.toString())) {
@@ -35,7 +37,22 @@ public enum Command {
     if (commandVal.equals(Command.STOP.toString())) {
       return Command.STOP;
     }
+    if (commandVal.equals(Command.UPGRADE.toString())) {
+      return Command.UPGRADE;
+    }
+    if (commandVal.equals(Command.TERMINATE.toString())) {
+      return Command.TERMINATE;
+    }
 
     return Command.NOP;
   }
+
+  public static String transform(Command command, boolean isUpgrade) {
+    switch (command) {
+    case STOP:
+      return isUpgrade ? "UPGRADE_STOP" : command.name();
+    default:
+      return command.name();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
 
b/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
index a50f3c0..dd78278 100644
--- 
a/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
+++ 
b/slider-core/src/main/java/org/apache/slider/providers/agent/ComponentInstanceState.java
@@ -98,7 +98,11 @@ public class ComponentInstanceState {
   }
 
   public void commandIssued(Command command) {
-    Command expected = getNextCommand();
+    commandIssued(command, false);
+  }
+
+  public void commandIssued(Command command, boolean isInUpgradeMode) {
+    Command expected = getNextCommand(isInUpgradeMode);
     if (expected != command) {
       throw new IllegalArgumentException("Command " + command + " is not 
allowed in state " + state);
     }
@@ -139,11 +143,15 @@ public class ComponentInstanceState {
   }
 
   public Command getNextCommand() {
+    return getNextCommand(false);
+  }
+
+  public Command getNextCommand(boolean isInUpgradeMode) {
     if (!hasPendingCommand()) {
       return Command.NOP;
     }
 
-    return this.state.getSupportedCommand();
+    return this.state.getSupportedCommand(isInUpgradeMode);
   }
 
   public State getState() {
@@ -155,6 +163,14 @@ public class ComponentInstanceState {
     this.state = state;
   }
 
+  public State getTargetState() {
+    return targetState;
+  }
+
+  public void setTargetState(State targetState) {
+    this.targetState = targetState;
+  }
+
   @Override
   public int hashCode() {
     int hashCode = 1;

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/providers/agent/State.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/providers/agent/State.java 
b/slider-core/src/main/java/org/apache/slider/providers/agent/State.java
index 09732a5..0738740 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/State.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/State.java
@@ -25,7 +25,13 @@ public enum State {
   INSTALLED,      // Installed (or stopped)
   STARTING,       // Starting
   STARTED,        // Started
-  INSTALL_FAILED;  // Install failed, start failure in INSTALLED
+  INSTALL_FAILED, // Install failed, start failure in INSTALLED
+  UPGRADING,      // Undergoing upgrade, perform necessary pre-upgrade steps
+  UPGRADED,       // Pre-upgrade steps completed
+  STOPPING,       // Stop has been issued
+  STOPPED,        // Agent has stopped
+  TERMINATING;    // Terminate signal to ask the agent to kill itself
+                  // No need for state TERMINATED (as the agent is dead by 
then)
 
   /**
    * Indicates whether or not it is a valid state to produce a command.
@@ -36,7 +42,9 @@ public enum State {
     switch (this) {
       case INSTALLING:
       case STARTING:
-      case STARTED:
+      case UPGRADING:
+      case STOPPING:
+      case TERMINATING:
         return false;
       default:
         return true;
@@ -49,12 +57,22 @@ public enum State {
    * @return command allowed in this state.
    */
   public Command getSupportedCommand() {
+    return getSupportedCommand(false);
+  }
+
+  public Command getSupportedCommand(boolean isInUpgradeMode) {
     switch (this) {
       case INIT:
       case INSTALL_FAILED:
         return Command.INSTALL;
       case INSTALLED:
         return Command.START;
+      case STARTED:
+        return isInUpgradeMode ? Command.UPGRADE : Command.NOP;
+      case UPGRADED:
+        return Command.STOP;
+      case STOPPED:
+        return Command.TERMINATE;
       default:
         return Command.NOP;
     }
@@ -68,7 +86,9 @@ public enum State {
   public State getNextState(CommandResult result) throws 
IllegalArgumentException {
     switch (result) {
       case IN_PROGRESS:
-        if (this == State.INSTALLING || this == State.STARTING) {
+        if (this == State.INSTALLING || this == State.STARTING
+            || this == State.UPGRADING || this == State.STOPPING
+            || this == State.TERMINATING) {
           return this;
         } else {
           throw new IllegalArgumentException(result + " is not valid for " + 
this);
@@ -78,6 +98,12 @@ public enum State {
           return State.INSTALLED;
         } else if (this == State.STARTING) {
           return State.STARTED;
+        } else if (this == State.UPGRADING) {
+          return State.UPGRADED;
+        } else if (this == State.STOPPING) {
+          return State.STOPPED;
+        } else if (this == State.STOPPED) {
+          return State.TERMINATING;
         } else {
           throw new IllegalArgumentException(result + " is not valid for " + 
this);
         }
@@ -86,6 +112,16 @@ public enum State {
           return State.INSTALL_FAILED;
         } else if (this == State.STARTING) {
           return State.INSTALLED;
+        } else if (this == State.UPGRADING) {
+          // if pre-upgrade failed, force stop now, so mark it upgraded
+          // what other options can be exposed to app owner?
+          return State.UPGRADED;
+        } else if (this == State.STOPPING) {
+          // if stop fails, force mark it stopped (and let container terminate)
+          return State.STOPPED;
+        } else if (this == State.STOPPED) {
+          // if in stopped state, force mark it as terminating
+          return State.TERMINATING;
         } else {
           throw new IllegalArgumentException(result + " is not valid for " + 
this);
         }
@@ -113,6 +149,24 @@ public enum State {
         } else {
           throw new IllegalArgumentException(command + " is not valid for " + 
this);
         }
+      case UPGRADE:
+        if (this == State.STARTED) {
+          return State.UPGRADING;
+        } else {
+          throw new IllegalArgumentException(command + " is not valid for " + 
this);
+        }
+      case STOP:
+        if (this == State.STARTED || this == State.UPGRADED) {
+          return State.STOPPING;
+        } else {
+          throw new IllegalArgumentException(command + " is not valid for " + 
this);
+        }
+      case TERMINATE:
+        if (this == State.STOPPED) {
+          return State.TERMINATING;
+        } else {
+          throw new IllegalArgumentException(command + " is not valid for " + 
this);
+        }
       case NOP:
         return this;
       default:
@@ -121,8 +175,13 @@ public enum State {
   }
 
   public boolean couldHaveIssued(Command command) {
-    if ((this == State.INSTALLING && command == Command.INSTALL) ||
-        (this == State.STARTING && command == Command.START)) {
+    if ((this == State.INSTALLING && command == Command.INSTALL)
+        || (this == State.STARTING && command == Command.START)
+        || (this == State.UPGRADING && command == Command.UPGRADE)
+        || (this == State.STOPPING 
+           && (command == Command.STOP || command == Command.NOP))
+        || (this == State.TERMINATING && command == Command.TERMINATE)
+       ) {
       return true;
     }
     return false;

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 8a52043..6ef5f8e 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -22,6 +22,8 @@ import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.health.HealthCheckRegistry;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.BlockingService;
+
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
@@ -110,6 +112,7 @@ import org.apache.slider.providers.ProviderRole;
 import org.apache.slider.providers.ProviderService;
 import org.apache.slider.providers.SliderProviderFactory;
 import org.apache.slider.providers.agent.AgentKeys;
+import org.apache.slider.providers.agent.AgentProviderService;
 import org.apache.slider.providers.slideram.SliderAMClientProvider;
 import org.apache.slider.providers.slideram.SliderAMProviderService;
 import 
org.apache.slider.server.appmaster.actions.ActionRegisterServiceInstance;
@@ -118,6 +121,7 @@ import 
org.apache.slider.server.appmaster.actions.RegisterComponentInstance;
 import org.apache.slider.server.appmaster.actions.QueueExecutor;
 import org.apache.slider.server.appmaster.actions.QueueService;
 import org.apache.slider.server.appmaster.actions.ActionStopSlider;
+import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers;
 import org.apache.slider.server.appmaster.actions.AsyncAction;
 import org.apache.slider.server.appmaster.actions.RenewingAction;
 import org.apache.slider.server.appmaster.actions.ResetFailureWindow;
@@ -141,7 +145,6 @@ import 
org.apache.slider.server.appmaster.state.ContainerAssignment;
 import org.apache.slider.server.appmaster.state.ProviderAppState;
 import org.apache.slider.server.appmaster.operations.RMOperationHandler;
 import org.apache.slider.server.appmaster.state.RoleInstance;
-import org.apache.slider.server.appmaster.state.RoleStatus;
 import org.apache.slider.server.appmaster.web.AgentService;
 import org.apache.slider.server.appmaster.web.rest.InsecureAmFilterInitializer;
 import org.apache.slider.server.appmaster.web.rest.agent.AgentWebApp;
@@ -161,7 +164,9 @@ import 
org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProvide
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -1617,6 +1622,58 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
   }
 
   /**
+   * Signal that containers are being upgraded
+   * 
+   * @param upgradeContainersRequest
+   *          request containing upgrade details
+   */
+  public synchronized void onUpgradeContainers(
+      ActionUpgradeContainers upgradeContainersRequest) throws IOException,
+      SliderException {
+    LOG_YARN.info("onUpgradeContainers([{}]",
+        upgradeContainersRequest.getMessage());
+    List<String> containers = upgradeContainersRequest.getContainers();
+    if (CollectionUtils.isEmpty(containers)) {
+      // components will not be null here, since it is pre-checked
+      List<String> components = upgradeContainersRequest.getComponents();
+      Map<ContainerId, RoleInstance> liveContainers = appState.getLiveNodes();
+      containers = new ArrayList<String>();
+      Map<String, List<String>> roleContainerMap = 
prepareRoleContainerMap(liveContainers);
+      for (String component : components) {
+        List<String> roleContainers = roleContainerMap.get(component);
+        if (roleContainers != null) {
+          containers.addAll(roleContainers);
+        }
+      }
+    }
+    LOG_YARN.info("Containers to be upgraded (total {}) : {}", 
containers.size(),
+        containers);
+    if (providerService instanceof AgentProviderService) {
+      AgentProviderService agentProviderService = (AgentProviderService) 
providerService;
+      agentProviderService.setInUpgradeMode(true);
+      agentProviderService.addUpgradeContainers(containers);
+    }
+  }
+
+  // create a reverse map of roles -> list of all live containers
+  private Map<String, List<String>> prepareRoleContainerMap(
+      Map<ContainerId, RoleInstance> liveContainers) {
+    Map<String, List<String>> roleContainerMap = new HashMap<String, 
List<String>>();
+    for (Map.Entry<ContainerId, RoleInstance> liveContainer : liveContainers
+        .entrySet()) {
+      RoleInstance role = liveContainer.getValue();
+      if (roleContainerMap.containsKey(role.role)) {
+        roleContainerMap.get(role.role).add(liveContainer.getKey().toString());
+      } else {
+        List<String> containers = new ArrayList<String>();
+        containers.add(liveContainer.getKey().toString());
+        roleContainerMap.put(role.role, containers);
+      }
+    }
+    return roleContainerMap;
+  }
+
+  /**
    * Implementation of cluster flexing.
    * It should be the only way that anything -even the AM itself on startup-
    * asks for nodes. 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionUpgradeContainers.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionUpgradeContainers.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionUpgradeContainers.java
new file mode 100644
index 0000000..ad3bb92
--- /dev/null
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionUpgradeContainers.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.appmaster.actions;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.slider.server.appmaster.SliderAppMaster;
+import org.apache.slider.server.appmaster.state.AppState;
+
+public class ActionUpgradeContainers extends AsyncAction {
+  private int exitCode;
+  private FinalApplicationStatus finalApplicationStatus;
+  private String message;
+  private List<String> containers;
+  private List<String> components;
+
+  public ActionUpgradeContainers(String name,
+      long delay,
+      TimeUnit timeUnit,
+      int exitCode,
+      FinalApplicationStatus finalApplicationStatus,
+      List<String> containers,
+      List<String> components,
+      String message) {
+    super(name, delay, timeUnit);
+    this.exitCode = exitCode;
+    this.finalApplicationStatus = finalApplicationStatus;
+    this.containers = containers;
+    this.components = components;
+    this.message = message;
+  }
+
+  @Override
+  public void execute(SliderAppMaster appMaster, QueueAccess queueService,
+      AppState appState) throws Exception {
+    if (CollectionUtils.isNotEmpty(this.containers)
+        || CollectionUtils.isNotEmpty(this.components)) {
+      SliderAppMaster.getLog().info("SliderAppMaster.upgradeContainers: {}",
+          message);
+      appMaster.onUpgradeContainers(this);
+    }
+  }
+
+  public int getExitCode() {
+    return exitCode;
+  }
+
+  public void setExitCode(int exitCode) {
+    this.exitCode = exitCode;
+  }
+
+  public FinalApplicationStatus getFinalApplicationStatus() {
+    return finalApplicationStatus;
+  }
+
+  public void setFinalApplicationStatus(
+      FinalApplicationStatus finalApplicationStatus) {
+    this.finalApplicationStatus = finalApplicationStatus;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
+  public List<String> getContainers() {
+    return containers;
+  }
+
+  public void setContainers(List<String> containers) {
+    this.containers = containers;
+  }
+
+  public List<String> getComponents() {
+    return components;
+  }
+
+  public void setComponents(List<String> components) {
+    this.components = components;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java
index 14b2bef..c597626 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolPBImpl.java
@@ -61,6 +61,17 @@ public class SliderClusterProtocolPBImpl implements 
SliderClusterProtocolPB {
   }
 
   @Override
+  public Messages.UpgradeContainersResponseProto 
upgradeContainers(RpcController controller,
+                                                       
Messages.UpgradeContainersRequestProto request) throws
+                                                                               
                  ServiceException {
+    try {
+      return real.upgradeContainers(request);
+    } catch (Exception e) {
+      throw wrap(e);
+    }
+  }
+
+  @Override
   public Messages.FlexClusterResponseProto flexCluster(RpcController 
controller,
                                                        
Messages.FlexClusterRequestProto request) throws
                                                                                
                  ServiceException {

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java
index ad4cca4..2d927c6 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolProxy.java
@@ -100,6 +100,17 @@ public class SliderClusterProtocolProxy implements 
SliderClusterProtocol {
   }
 
   @Override
+  public Messages.UpgradeContainersResponseProto upgradeContainers(
+      Messages.UpgradeContainersRequestProto request) throws IOException,
+      YarnException {
+    try {
+      return endpoint.upgradeContainers(NULL_CONTROLLER, request);
+    } catch (ServiceException e) {
+      throw convert(e);
+    }
+  }
+
+  @Override
   public Messages.FlexClusterResponseProto 
flexCluster(Messages.FlexClusterRequestProto request)
       throws IOException {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
index d5822e9..7e25cd0 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java
@@ -43,6 +43,7 @@ import 
org.apache.slider.server.appmaster.actions.ActionFlexCluster;
 import org.apache.slider.server.appmaster.actions.ActionHalt;
 import org.apache.slider.server.appmaster.actions.ActionKillContainer;
 import org.apache.slider.server.appmaster.actions.ActionStopSlider;
+import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers;
 import org.apache.slider.server.appmaster.actions.AsyncAction;
 import org.apache.slider.server.appmaster.actions.QueueAccess;
 import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
@@ -188,6 +189,29 @@ public class SliderIPCService extends AbstractService
   }
 
   @Override //SliderClusterProtocol
+  public Messages.UpgradeContainersResponseProto upgradeContainers(
+      Messages.UpgradeContainersRequestProto request) throws IOException,
+      YarnException {
+    onRpcCall("upgrade");
+    String message = request.getMessage();
+    if (message == null) {
+      message = "application containers upgraded by client";
+    }
+    ActionUpgradeContainers upgradeContainers =
+        new ActionUpgradeContainers(
+            "Upgrade containers",
+            1000, TimeUnit.MILLISECONDS,
+            LauncherExitCodes.EXIT_SUCCESS,
+            FinalApplicationStatus.SUCCEEDED,
+            request.getContainerList(),
+            request.getComponentList(),
+            message);
+    log.info("SliderAppMasterApi.upgradeContainers: {}", upgradeContainers);
+    schedule(upgradeContainers);
+    return Messages.UpgradeContainersResponseProto.getDefaultInstance();
+  }
+
+  @Override //SliderClusterProtocol
   public Messages.FlexClusterResponseProto 
flexCluster(Messages.FlexClusterRequestProto request)
       throws IOException {
     onRpcCall("flex");

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/proto/SliderClusterMessages.proto
----------------------------------------------------------------------
diff --git a/slider-core/src/main/proto/SliderClusterMessages.proto 
b/slider-core/src/main/proto/SliderClusterMessages.proto
index c2eba89..5e770d5 100644
--- a/slider-core/src/main/proto/SliderClusterMessages.proto
+++ b/slider-core/src/main/proto/SliderClusterMessages.proto
@@ -51,6 +51,7 @@ message StopClusterRequestProto {
   */
   required string message = 1;
 }
+
 /**
  * stop the cluster
  */
@@ -58,6 +59,24 @@ message StopClusterResponseProto {
 }
 
 /**
+ * upgrade the containers
+ */
+message UpgradeContainersRequestProto {
+  /**
+  message to include
+  */
+  required string message = 1;
+  repeated string container = 2;
+  repeated string component = 3;
+}
+
+/**
+ * upgrade the containers
+ */
+message UpgradeContainersResponseProto {
+}
+
+/**
  * flex the cluster
  */
 message FlexClusterRequestProto {

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/proto/SliderClusterProtocol.proto
----------------------------------------------------------------------
diff --git a/slider-core/src/main/proto/SliderClusterProtocol.proto 
b/slider-core/src/main/proto/SliderClusterProtocol.proto
index d2ba723..aa59bb4 100644
--- a/slider-core/src/main/proto/SliderClusterProtocol.proto
+++ b/slider-core/src/main/proto/SliderClusterProtocol.proto
@@ -56,6 +56,12 @@ service SliderClusterProtocolPB {
     returns(StopClusterResponseProto);
     
   /**
+   * Upgrade containers 
+   */
+  rpc upgradeContainers(UpgradeContainersRequestProto) 
+    returns(UpgradeContainersResponseProto);
+
+  /**
    * Flex the cluster. 
    */
   rpc flexCluster(FlexClusterRequestProto) 

Reply via email to