YARN-8018.  Added support for initiating yarn service upgrade.
            Contributed by Chandni Singh


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/27d60a16
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/27d60a16
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/27d60a16

Branch: refs/heads/HDFS-12943
Commit: 27d60a16342fd39973d43b61008f54a8815a6237
Parents: edb202e
Author: Eric Yang <ey...@apache.org>
Authored: Mon Mar 26 18:46:31 2018 -0400
Committer: Eric Yang <ey...@apache.org>
Committed: Mon Mar 26 18:46:31 2018 -0400

----------------------------------------------------------------------
 .../yarn/service/client/ApiServiceClient.java   |  18 ++
 .../hadoop/yarn/service/webapp/ApiServer.java   |  24 ++
 .../hadoop/yarn/service/ClientAMProtocol.java   |  12 +
 .../hadoop/yarn/service/ClientAMService.java    |  24 ++
 .../hadoop/yarn/service/ServiceEvent.java       |  49 ++++
 .../hadoop/yarn/service/ServiceEventType.java   |  28 +++
 .../hadoop/yarn/service/ServiceManager.java     | 225 +++++++++++++++++++
 .../hadoop/yarn/service/ServiceScheduler.java   |  19 ++
 .../yarn/service/UpgradeComponentsFinder.java   | 162 +++++++++++++
 .../service/api/records/ComponentState.java     |   2 +-
 .../yarn/service/api/records/ServiceState.java  |   2 +-
 .../yarn/service/client/ServiceClient.java      | 153 ++++++++++---
 .../yarn/service/component/Component.java       |  14 ++
 .../yarn/service/component/ComponentEvent.java  |  12 +
 .../service/component/ComponentEventType.java   |   4 +-
 .../yarn/service/component/ComponentState.java  |   3 +-
 .../yarn/service/conf/YarnServiceConstants.java |   2 +
 .../pb/client/ClientAMProtocolPBClientImpl.java |  26 +++
 .../service/ClientAMProtocolPBServiceImpl.java  |  24 ++
 .../yarn/service/utils/CoreFileSystem.java      |  31 ++-
 .../yarn/service/utils/ServiceApiUtil.java      |  28 ++-
 .../src/main/proto/ClientAMProtocol.proto       |  19 +-
 .../hadoop/yarn/service/ServiceTestUtils.java   |  90 +++++++-
 .../TestDefaultUpgradeComponentsFinder.java     |  63 ++++++
 .../hadoop/yarn/service/TestServiceManager.java | 156 +++++++++++++
 .../yarn/service/TestYarnNativeServices.java    |  76 ++++---
 .../yarn/service/client/TestServiceClient.java  | 125 +++++++++++
 .../yarn/service/utils/TestCoreFileSystem.java  |  46 ++++
 .../hadoop/yarn/client/api/AppAdminClient.java  |  16 ++
 29 files changed, 1374 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
index 49702e3..e4a245d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java
@@ -469,4 +469,22 @@ public class ApiServiceClient extends AppAdminClient {
     return output;
   }
 
+  @Override
+  public int actionUpgrade(String appName,
+      String fileName) throws IOException, YarnException {
+    int result;
+    try {
+      Service service =
+          loadAppJsonFromLocalFS(fileName, appName, null, null);
+      service.setState(ServiceState.UPGRADING);
+      String buffer = jsonSerDeser.toJson(service);
+      ClientResponse response = getApiClient()
+          .post(ClientResponse.class, buffer);
+      result = processResponse(response);
+    } catch (Exception e) {
+      LOG.error("Failed to upgrade application: ", e);
+      result = EXIT_EXCEPTION_THROWN;
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
index e7979b8..59ee05d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
@@ -375,6 +375,12 @@ public class ApiServer {
           && updateServiceData.getLifetime() > 0) {
         return updateLifetime(appName, updateServiceData, ugi);
       }
+
+      // If an UPGRADE is requested
+      if (updateServiceData.getState() != null &&
+          updateServiceData.getState() == ServiceState.UPGRADING) {
+        return upgradeService(updateServiceData, ugi);
+      }
     } catch (UndeclaredThrowableException e) {
       return formatResponse(Status.BAD_REQUEST,
           e.getCause().getMessage());
@@ -475,6 +481,24 @@ public class ApiServer {
     return formatResponse(Status.OK, status);
   }
 
+  private Response upgradeService(Service service,
+      final UserGroupInformation ugi) throws IOException, InterruptedException 
{
+    ServiceStatus status = new ServiceStatus();
+    ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+      ServiceClient sc = getServiceClient();
+      sc.init(YARN_CONFIG);
+      sc.start();
+      sc.actionUpgrade(service);
+      sc.close();
+      return null;
+    });
+    LOG.info("Service {} version {} upgrade initialized");
+    status.setDiagnostics("Service " + service.getName() +
+        " version " + service.getVersion() + " saved.");
+    status.setState(ServiceState.ACCEPTED);
+    return formatResponse(Status.ACCEPTED, status);
+  }
+
   /**
    * Used by negative test case.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
index 516d23d..4422451 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java
@@ -23,8 +23,14 @@ import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto;
+
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
+
 
 import java.io.IOException;
 
@@ -37,4 +43,10 @@ public interface ClientAMProtocol {
 
   StopResponseProto stop(StopRequestProto requestProto)
       throws IOException, YarnException;
+
+  UpgradeServiceResponseProto upgrade(UpgradeServiceRequestProto request)
+      throws IOException, YarnException;
+
+  RestartServiceResponseProto restart(RestartServiceRequestProto request)
+      throws IOException, YarnException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
index fb73f15..08c36f4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
@@ -33,8 +33,12 @@ import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
 import org.apache.hadoop.yarn.service.component.ComponentEvent;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.slf4j.Logger;
@@ -142,4 +146,24 @@ public class ClientAMService extends AbstractService
   public InetSocketAddress getBindAddress() {
     return bindAddress;
   }
+
+  @Override
+  public UpgradeServiceResponseProto upgrade(
+      UpgradeServiceRequestProto request) throws IOException {
+    ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE);
+    event.setVersion(request.getVersion());
+    context.scheduler.getDispatcher().getEventHandler().handle(event);
+    LOG.info("Upgrading service to version {} by {}", request.getVersion(),
+        UserGroupInformation.getCurrentUser());
+    return UpgradeServiceResponseProto.newBuilder().build();
+  }
+
+  @Override
+  public RestartServiceResponseProto restart(RestartServiceRequestProto 
request)
+      throws IOException, YarnException {
+    ServiceEvent event = new ServiceEvent(ServiceEventType.START);
+    context.scheduler.getDispatcher().getEventHandler().handle(event);
+    LOG.info("Restart service by {}", UserGroupInformation.getCurrentUser());
+    return RestartServiceResponseProto.newBuilder().build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
new file mode 100644
index 0000000..9e7d442
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+/**
+ * Events are handled by {@link ServiceManager} to manage the service
+ * state.
+ */
+public class ServiceEvent extends AbstractEvent<ServiceEventType> {
+
+  private final ServiceEventType type;
+  private String version;
+
+  public ServiceEvent(ServiceEventType serviceEventType) {
+    super(serviceEventType);
+    this.type = serviceEventType;
+  }
+
+  public ServiceEventType getType() {
+    return type;
+  }
+
+  public String getVersion() {
+    return version;
+  }
+
+  public ServiceEvent setVersion(String version) {
+    this.version = version;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
new file mode 100644
index 0000000..2162eb5
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+/**
+ * Types of {@link ServiceEvent}.
+ */
+public enum ServiceEventType {
+  START,
+  UPGRADE,
+  STOP_UPGRADE
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
new file mode 100644
index 0000000..a3fbe89
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.component.ComponentEvent;
+import org.apache.hadoop.yarn.service.component.ComponentEventType;
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
+import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
+import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.MessageFormat;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Manages the state of the service.
+ */
+public class ServiceManager implements EventHandler<ServiceEvent> {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ServiceManager.class);
+
+  private final Service serviceSpec;
+  private final ServiceContext context;
+  private final ServiceScheduler scheduler;
+  private final ReentrantReadWriteLock.ReadLock readLock;
+  private final ReentrantReadWriteLock.WriteLock writeLock;
+
+  private final StateMachine<State, ServiceEventType, ServiceEvent>
+      stateMachine;
+
+  private final AsyncDispatcher dispatcher;
+  private final SliderFileSystem fs;
+  private final UpgradeComponentsFinder componentsFinder;
+
+  private String upgradeVersion;
+
+  private static final StateMachineFactory<ServiceManager, State,
+      ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY =
+      new StateMachineFactory<ServiceManager, State,
+          ServiceEventType, ServiceEvent>(State.STABLE)
+
+          .addTransition(State.STABLE, EnumSet.of(State.STABLE,
+              State.UPGRADING), ServiceEventType.UPGRADE,
+              new StartUpgradeTransition())
+
+          .addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
+              State.UPGRADING), ServiceEventType.START,
+              new StopUpgradeTransition())
+          .installTopology();
+
+  public ServiceManager(ServiceContext context) {
+    Preconditions.checkNotNull(context);
+    this.context = context;
+    serviceSpec = context.service;
+    scheduler = context.scheduler;
+    stateMachine = STATE_MACHINE_FACTORY.make(this);
+    dispatcher = scheduler.getDispatcher();
+
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+    fs = context.fs;
+    componentsFinder = new UpgradeComponentsFinder
+        .DefaultUpgradeComponentsFinder();
+  }
+
+  @Override
+  public void handle(ServiceEvent event) {
+    try {
+      writeLock.lock();
+      State oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitionException e) {
+        LOG.error(MessageFormat.format(
+            "[SERVICE]: Invalid event {0} at {1}.", event.getType(),
+            oldState), e);
+      }
+      if (oldState != getState()) {
+        LOG.info("[SERVICE] Transitioned from {} to {} on {} event.",
+            oldState, getState(), event.getType());
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private State getState() {
+    this.readLock.lock();
+    try {
+      return this.stateMachine.getCurrentState();
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  private static class StartUpgradeTransition implements
+      MultipleArcTransition<ServiceManager, ServiceEvent, State> {
+
+    @Override
+    public State transition(ServiceManager serviceManager,
+        ServiceEvent event) {
+      try {
+        Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
+            serviceManager.fs, serviceManager.getName(), event.getVersion());
+
+        serviceManager.serviceSpec.setState(ServiceState.UPGRADING);
+        List<org.apache.hadoop.yarn.service.api.records.Component>
+            compsThatNeedUpgrade = serviceManager.componentsFinder.
+            findTargetComponentSpecs(serviceManager.serviceSpec, targetSpec);
+
+        if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) {
+          compsThatNeedUpgrade.forEach(component -> {
+            ComponentEvent needUpgradeEvent = new ComponentEvent(
+                component.getName(), ComponentEventType.UPGRADE).
+                setTargetSpec(component);
+            serviceManager.dispatcher.getEventHandler().handle(
+                needUpgradeEvent);
+          });
+        }
+        serviceManager.upgradeVersion = event.getVersion();
+        return State.UPGRADING;
+      } catch (Throwable e) {
+        LOG.error("[SERVICE]: Upgrade to version {} failed", 
event.getVersion(),
+            e);
+        return State.STABLE;
+      }
+    }
+  }
+
+  private static class StopUpgradeTransition implements
+      MultipleArcTransition<ServiceManager, ServiceEvent, State> {
+
+    @Override
+    public State transition(ServiceManager serviceManager,
+        ServiceEvent event) {
+      //abort is not supported currently
+      //trigger re-check of service state
+      ServiceMaster.checkAndUpdateServiceState(serviceManager.scheduler,
+          true);
+      if (serviceManager.serviceSpec.getState().equals(ServiceState.STABLE)) {
+        return serviceManager.finalizeUpgrade() ? State.STABLE :
+            State.UPGRADING;
+      } else {
+        return State.UPGRADING;
+      }
+    }
+  }
+
+  /**
+   * @return whether finalization of upgrade was successful.
+   */
+  private boolean finalizeUpgrade() {
+    try {
+      Service upgradeSpec = ServiceApiUtil.loadServiceUpgrade(
+          fs, getName(), upgradeVersion);
+      ServiceApiUtil.writeAppDefinition(fs,
+          ServiceApiUtil.getServiceJsonPath(fs, getName()), upgradeSpec);
+    } catch (IOException e) {
+      LOG.error("Upgrade did not complete because unable to overwrite the" +
+          " service definition", e);
+      return false;
+    }
+
+    try {
+      fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
+    } catch (IOException e) {
+      LOG.warn("Unable to delete upgrade definition for service {} " +
+              "version {}", getName(), upgradeVersion);
+    }
+    serviceSpec.setVersion(upgradeVersion);
+    upgradeVersion = null;
+    return true;
+  }
+
+  /**
+   * Returns the name of the service.
+   */
+  public String getName() {
+    return serviceSpec.getName();
+  }
+
+  /**
+   * State of {@link ServiceManager}.
+   */
+  public enum State {
+    STABLE, UPGRADING
+  }
+
+
+  @VisibleForTesting
+  Service getServiceSpec() {
+    return serviceSpec;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index 6333197..79eef49 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -110,6 +110,9 @@ public class ServiceScheduler extends CompositeService {
       LoggerFactory.getLogger(ServiceScheduler.class);
   private Service app;
 
+   // This encapsulates the <code>app</code> with methods to upgrade the app.
+  private ServiceManager serviceManager;
+
   // component_name -> component
   private final Map<String, Component> componentsByName =
       new ConcurrentHashMap<>();
@@ -192,6 +195,7 @@ public class ServiceScheduler extends CompositeService {
     addIfService(nmClient);
 
     dispatcher = new AsyncDispatcher("Component  dispatcher");
+    dispatcher.register(ServiceEventType.class, new ServiceEventHandler());
     dispatcher.register(ComponentEventType.class,
         new ComponentEventHandler());
     dispatcher.register(ComponentInstanceEventType.class,
@@ -300,6 +304,7 @@ public class ServiceScheduler extends CompositeService {
 
     // Since AM has been started and registered, the service is in STARTED 
state
     app.setState(ServiceState.STARTED);
+    serviceManager = new ServiceManager(context);
 
     // recover components based on containers sent from RM
     recoverComponents(response);
@@ -510,6 +515,20 @@ public class ServiceScheduler extends CompositeService {
     }
   }
 
+  private final class ServiceEventHandler
+      implements EventHandler<ServiceEvent> {
+    @Override
+    public void handle(ServiceEvent event) {
+      try {
+        serviceManager.handle(event);
+      } catch (Throwable t) {
+        LOG.error(MessageFormat
+            .format("[SERVICE]: Error in handling event type {0}",
+                event.getType()), t);
+      }
+    }
+  }
+
   private final class ComponentEventHandler
       implements EventHandler<ComponentEvent> {
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java
new file mode 100644
index 0000000..e18697b
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Finds all the the target component specs.
+ */
+public interface UpgradeComponentsFinder {
+
+  List<Component> findTargetComponentSpecs(Service currentDef,
+      Service targetDef);
+
+  /**
+   * Default implementation of {@link UpgradeComponentsFinder} that finds all
+   * the target component specs.
+   */
+  class DefaultUpgradeComponentsFinder implements UpgradeComponentsFinder {
+
+    @Override
+    public List<Component> findTargetComponentSpecs(Service currentDef,
+        Service targetDef) {
+      if (currentDef.getComponents().size() !=
+          targetDef.getComponents().size()) {
+        throw new UnsupportedOperationException(
+            "addition/deletion of components not supported by upgrade");
+      }
+      if (!currentDef.getKerberosPrincipal().equals(
+          targetDef.getKerberosPrincipal())) {
+        throw new UnsupportedOperationException("changes to kerberos " +
+            "principal not supported by upgrade");
+      }
+      if (!Objects.equals(currentDef.getQueue(), targetDef.getQueue())) {
+        throw new UnsupportedOperationException("changes to queue " +
+            "not supported by upgrade");
+      }
+      if (!Objects.equals(currentDef.getPlacementPolicy(),
+          targetDef.getPlacementPolicy())) {
+        throw new UnsupportedOperationException("changes to placement policy " 
+
+            "not supported by upgrade");
+      }
+
+      if (!Objects.equals(currentDef.getResource(), targetDef.getResource())) {
+        throw new UnsupportedOperationException("changes to resource " +
+            "not supported by upgrade");
+      }
+
+      if (!Objects.equals(currentDef.getDescription(),
+          targetDef.getDescription())) {
+        throw new UnsupportedOperationException("changes to description " +
+            "not supported by upgrade");
+      }
+
+      if (!Objects.equals(currentDef.getQuicklinks(),
+          targetDef.getQuicklinks())) {
+        throw new UnsupportedOperationException("changes to quick links " +
+            "not supported by upgrade");
+      }
+
+      if (!Objects.equals(currentDef.getLaunchTime(),
+          targetDef.getLaunchTime())) {
+        throw new UnsupportedOperationException("changes to launch time " +
+            "not supported by upgrade");
+      }
+
+
+      if (!Objects.equals(currentDef.getLifetime(),
+          targetDef.getLifetime())) {
+        throw new UnsupportedOperationException("changes to lifetime " +
+            "not supported by upgrade");
+      }
+
+      if (!Objects.equals(currentDef.getConfiguration(),
+          currentDef.getConfiguration())) {
+        return targetDef.getComponents();
+      }
+
+      if (!Objects.equals(currentDef.getArtifact(), targetDef.getArtifact())) {
+        return targetDef.getComponents();
+      }
+
+      List<Component> targetComps = new ArrayList<>();
+      targetDef.getComponents().forEach(component -> {
+        Component currentComp = currentDef.getComponent(component.getName());
+
+        if (!Objects.equals(currentComp.getName(), component.getName())) {
+          throw new UnsupportedOperationException(
+              "changes to component name not supported by upgrade");
+        }
+
+        if (!Objects.equals(currentComp.getDependencies(),
+            component.getDependencies())) {
+          throw new UnsupportedOperationException(
+              "changes to component dependencies not supported by upgrade");
+        }
+
+        if (!Objects.equals(currentComp.getReadinessCheck(),
+            component.getReadinessCheck())) {
+          throw new UnsupportedOperationException(
+              "changes to component readiness check not supported by upgrade");
+        }
+
+        if (!Objects.equals(currentComp.getResource(),
+            component.getResource())) {
+          throw new UnsupportedOperationException(
+              "changes to component resource not supported by upgrade");
+        }
+
+
+        if (!Objects.equals(currentComp.getRunPrivilegedContainer(),
+            component.getRunPrivilegedContainer())) {
+          throw new UnsupportedOperationException(
+              "changes to run privileged container not supported by upgrade");
+        }
+
+        if (!Objects.equals(currentComp.getPlacementPolicy(),
+            component.getPlacementPolicy())) {
+          throw new UnsupportedOperationException(
+              "changes to component placement policy not supported by 
upgrade");
+        }
+
+        if (!Objects.equals(currentComp.getQuicklinks(),
+            component.getQuicklinks())) {
+          throw new UnsupportedOperationException(
+              "changes to component quick links not supported by upgrade");
+        }
+
+        if (!Objects.equals(currentComp.getArtifact(),
+            component.getArtifact()) ||
+            !Objects.equals(currentComp.getLaunchCommand(),
+                component.getLaunchCommand()) ||
+            !Objects.equals(currentComp.getConfiguration(),
+                component.getConfiguration())) {
+          targetComps.add(component);
+        }
+      });
+      return targetComps;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java
index 702a9ae..f7eda7b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java
@@ -26,5 +26,5 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceStability.Unstable
 @ApiModel(description = "The current state of a component.")
 public enum ComponentState {
-  FLEXING, STABLE
+  FLEXING, STABLE, NEEDS_UPGRADE;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
index 902a0b1..286eaa2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java
@@ -29,5 +29,5 @@ import org.apache.hadoop.classification.InterfaceStability;
 @ApiModel(description = "The current state of an service.")
 @javax.annotation.Generated(value = "class 
io.swagger.codegen.languages.JavaClientCodegen", date = 
"2016-06-02T08:15:05.615-07:00")
 public enum ServiceState {
-  ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX;
+  ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
index 5731e11..04ca943 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.service.client;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -55,7 +56,9 @@ import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
 import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
 import org.apache.hadoop.yarn.service.ClientAMProtocol;
 import org.apache.hadoop.yarn.service.ServiceMaster;
 import org.apache.hadoop.yarn.service.api.records.Component;
@@ -73,8 +76,8 @@ import 
org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
 import org.apache.hadoop.yarn.service.provider.ProviderUtils;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils;
-import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.service.utils.ServiceUtils;
+import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.service.utils.ZookeeperUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Times;
@@ -186,6 +189,7 @@ public class ServiceClient extends AppAdminClient 
implements SliderExitCodes,
     return service;
   }
 
+  @Override
   public int actionSave(String fileName, String serviceName, Long lifetime,
       String queue) throws IOException, YarnException {
     return actionBuild(loadAppJsonFromLocalFS(fileName, serviceName,
@@ -194,9 +198,54 @@ public class ServiceClient extends AppAdminClient 
implements SliderExitCodes,
 
   public int actionBuild(Service service)
       throws YarnException, IOException {
-    Path appDir = checkAppNotExistOnHdfs(service);
     ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
-    createDirAndPersistApp(appDir, service);
+    Path appDir = checkAppNotExistOnHdfs(service, false);
+    ServiceApiUtil.createDirAndPersistApp(fs, appDir, service);
+    return EXIT_SUCCESS;
+  }
+
+  @Override
+  public int actionUpgrade(String appName, String fileName)
+      throws IOException, YarnException {
+    checkAppExistOnHdfs(appName);
+    Service upgradeService = loadAppJsonFromLocalFS(fileName, appName,
+        null, null);
+    return actionUpgrade(upgradeService);
+  }
+
+  public int actionUpgrade(Service service) throws YarnException, IOException {
+    Service persistedService =
+        ServiceApiUtil.loadService(fs, service.getName());
+    if (!StringUtils.isEmpty(persistedService.getId())) {
+      cachedAppInfo.put(persistedService.getName(), new AppInfo(
+          ApplicationId.fromString(persistedService.getId()),
+          persistedService.getKerberosPrincipal().getPrincipalName()));
+    }
+
+    if (persistedService.getVersion().equals(service.getVersion())) {
+      String message =
+          service.getName() + " is already at version " + service.getVersion()
+              + ". There is nothing to upgrade.";
+      LOG.error(message);
+      throw new YarnException(message);
+    }
+
+    Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true);
+    ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
+    ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service);
+
+    ApplicationReport appReport =
+        yarnClient.getApplicationReport(getAppId(service.getName()));
+    if (StringUtils.isEmpty(appReport.getHost())) {
+      throw new YarnException(service.getName() + " AM hostname is empty");
+    }
+    ClientAMProtocol proxy = createAMProxy(service.getName(), appReport);
+
+    UpgradeServiceRequestProto.Builder requestBuilder =
+        UpgradeServiceRequestProto.newBuilder();
+    requestBuilder.setVersion(service.getVersion());
+
+    proxy.upgrade(requestBuilder.build());
     return EXIT_SUCCESS;
   }
 
@@ -212,16 +261,16 @@ public class ServiceClient extends AppAdminClient 
implements SliderExitCodes,
     String serviceName = service.getName();
     ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
     verifyNoLiveAppInRM(serviceName, "create");
-    Path appDir = checkAppNotExistOnHdfs(service);
+    Path appDir = checkAppNotExistOnHdfs(service, false);
 
     // Write the definition first and then submit - AM will read the definition
-    createDirAndPersistApp(appDir, service);
+    ServiceApiUtil.createDirAndPersistApp(fs, appDir, service);
     ApplicationId appId = submitApp(service);
     cachedAppInfo.put(serviceName, new AppInfo(appId, service
         .getKerberosPrincipal().getPrincipalName()));
     service.setId(appId.toString());
     // update app definition with appId
-    persistAppDef(appDir, service);
+    ServiceApiUtil.writeAppDefinition(fs, appDir, service);
     return appId;
   }
 
@@ -349,6 +398,7 @@ public class ServiceClient extends AppAdminClient 
implements SliderExitCodes,
     return original;
   }
 
+  @Override
   public int actionStop(String serviceName)
       throws YarnException, IOException {
     return actionStop(serviceName, true);
@@ -424,6 +474,7 @@ public class ServiceClient extends AppAdminClient 
implements SliderExitCodes,
     return EXIT_SUCCESS;
   }
 
+  @Override
   public int actionDestroy(String serviceName) throws YarnException,
       IOException {
     ServiceApiUtil.validateNameFormat(serviceName, getConfig());
@@ -557,8 +608,8 @@ public class ServiceClient extends AppAdminClient 
implements SliderExitCodes,
     }
   }
 
-  private ApplicationId submitApp(Service app)
-      throws IOException, YarnException {
+  @VisibleForTesting
+  ApplicationId submitApp(Service app) throws IOException, YarnException {
     String serviceName = app.getName();
     Configuration conf = getConfig();
     Path appRootDir = fs.buildClusterDirPath(app.getName());
@@ -772,29 +823,64 @@ public class ServiceClient extends AppAdminClient 
implements SliderExitCodes,
     return hasAMLog4j;
   }
 
+  @Override
   public int actionStart(String serviceName) throws YarnException, IOException 
{
     ServiceApiUtil.validateNameFormat(serviceName, getConfig());
-    Path appDir = checkAppExistOnHdfs(serviceName);
-    Service service = ServiceApiUtil.loadService(fs, serviceName);
-    ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
-    // see if it is actually running and bail out;
-    verifyNoLiveAppInRM(serviceName, "start");
-    ApplicationId appId = submitApp(service);
-    service.setId(appId.toString());
-    // write app definition on to hdfs
-    Path appJson = persistAppDef(appDir, service);
-    LOG.info("Persisted service " + service.getName() + " at " + appJson);
-    return 0;
+    Service liveService = getStatus(serviceName);
+    if (liveService == null ||
+        !liveService.getState().equals(ServiceState.UPGRADING)) {
+      Path appDir = checkAppExistOnHdfs(serviceName);
+      Service service = ServiceApiUtil.loadService(fs, serviceName);
+      ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
+      // see if it is actually running and bail out;
+      verifyNoLiveAppInRM(serviceName, "start");
+      ApplicationId appId = submitApp(service);
+      service.setId(appId.toString());
+      // write app definition on to hdfs
+      Path appJson = ServiceApiUtil.writeAppDefinition(fs, appDir, service);
+      LOG.info("Persisted service " + service.getName() + " at " + appJson);
+      return 0;
+    } else {
+      LOG.info("Finalize service {} upgrade");
+      ApplicationReport appReport =
+          yarnClient.getApplicationReport(getAppId(serviceName));
+      if (StringUtils.isEmpty(appReport.getHost())) {
+        throw new YarnException(serviceName + " AM hostname is empty");
+      }
+      ClientAMProtocol proxy = createAMProxy(serviceName, appReport);
+
+      RestartServiceRequestProto.Builder requestBuilder =
+          RestartServiceRequestProto.newBuilder();
+      proxy.restart(requestBuilder.build());
+      return 0;
+    }
   }
 
-  private Path checkAppNotExistOnHdfs(Service service)
+  /**
+   * Verifies that the service definition does not exist on hdfs.
+   *
+   * @param service   service
+   * @param isUpgrade true for upgrades; false otherwise
+   * @return path to the service definition..
+   * @throws IOException
+   * @throws SliderException
+   */
+  private Path checkAppNotExistOnHdfs(Service service, boolean isUpgrade)
       throws IOException, SliderException {
-    Path appDir = fs.buildClusterDirPath(service.getName());
+    Path appDir = !isUpgrade ? fs.buildClusterDirPath(service.getName()) :
+        fs.buildClusterUpgradeDirPath(service.getName(), service.getVersion());
     fs.verifyDirectoryNonexistent(
         new Path(appDir, service.getName() + ".json"));
     return appDir;
   }
 
+  /**
+   * Verifies that the service exists on hdfs.
+   * @param serviceName service name
+   * @return path to the service definition.
+   * @throws IOException
+   * @throws SliderException
+   */
   private Path checkAppExistOnHdfs(String serviceName)
       throws IOException, SliderException {
     Path appDir = fs.buildClusterDirPath(serviceName);
@@ -802,20 +888,6 @@ public class ServiceClient extends AppAdminClient 
implements SliderExitCodes,
     return appDir;
   }
 
-  private void createDirAndPersistApp(Path appDir, Service service)
-      throws IOException, SliderException {
-    FsPermission appDirPermission = new FsPermission("750");
-    fs.createWithPermissions(appDir, appDirPermission);
-    Path appJson = persistAppDef(appDir, service);
-    LOG.info("Persisted service " + service.getName() + " at " + appJson);
-  }
-
-  private Path persistAppDef(Path appDir, Service service) throws IOException {
-    Path appJson = new Path(appDir, service.getName() + ".json");
-    jsonSerDeser.save(fs.getFileSystem(), appJson, service, true);
-    return appJson;
-  }
-
   private void addHdfsDelegationTokenIfSecure(ContainerLaunchContext amContext)
       throws IOException {
     if (!UserGroupInformation.isSecurityEnabled()) {
@@ -1074,6 +1146,17 @@ public class ServiceClient extends AppAdminClient 
implements SliderExitCodes,
         UserGroupInformation.getCurrentUser(), rpc, address);
   }
 
+  @VisibleForTesting
+  void setFileSystem(SliderFileSystem fileSystem)
+      throws IOException {
+    this.fs = fileSystem;
+  }
+
+  @VisibleForTesting
+  void setYarnClient(YarnClient yarnClient) {
+    this.yarnClient = yarnClient;
+  }
+
   public synchronized ApplicationId getAppId(String serviceName)
       throws IOException, YarnException {
     if (cachedAppInfo.containsKey(serviceName)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index 3090692..0cd7e2c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -138,6 +138,12 @@ public class Component implements 
EventHandler<ComponentEvent> {
           // For flex down, go to STABLE state
           .addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
               FLEX, new FlexComponentTransition())
+          .addTransition(STABLE, UPGRADING, UPGRADE,
+              new ComponentNeedsUpgradeTransition())
+          .addTransition(FLEXING, UPGRADING, UPGRADE,
+              new ComponentNeedsUpgradeTransition())
+          .addTransition(UPGRADING, UPGRADING, UPGRADE,
+              new ComponentNeedsUpgradeTransition())
           .installTopology();
 
   public Component(
@@ -355,6 +361,14 @@ public class Component implements 
EventHandler<ComponentEvent> {
     }
   }
 
+  private static class ComponentNeedsUpgradeTransition extends BaseTransition {
+    @Override
+    public void transition(Component component, ComponentEvent event) {
+      component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
+          records.ComponentState.NEEDS_UPGRADE);
+    }
+  }
+
   public void removePendingInstance(ComponentInstance instance) {
     pendingInstances.remove(instance);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
index 447b436..7bd5cb9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.service.component;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -32,6 +33,7 @@ public class ComponentEvent extends 
AbstractEvent<ComponentEventType> {
   private ComponentInstance instance;
   private ContainerStatus status;
   private ContainerId containerId;
+  private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
 
   public ContainerId getContainerId() {
     return containerId;
@@ -91,4 +93,14 @@ public class ComponentEvent extends 
AbstractEvent<ComponentEventType> {
     this.status = status;
     return this;
   }
+
+  public org.apache.hadoop.yarn.service.api.records.Component getTargetSpec() {
+    return targetSpec;
+  }
+
+  public ComponentEvent setTargetSpec(
+      org.apache.hadoop.yarn.service.api.records.Component targetSpec) {
+    this.targetSpec = Preconditions.checkNotNull(targetSpec);
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
index 067302d..970788a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
@@ -23,5 +23,7 @@ public enum ComponentEventType {
   CONTAINER_ALLOCATED,
   CONTAINER_RECOVERED,
   CONTAINER_STARTED,
-  CONTAINER_COMPLETED
+  CONTAINER_COMPLETED,
+  UPGRADE,
+  STOP_UPGRADE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
index a5f9ff4..0f63d03 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java
@@ -21,5 +21,6 @@ package org.apache.hadoop.yarn.service.component;
 public enum ComponentState {
   INIT,
   FLEXING,
-  STABLE
+  STABLE,
+  UPGRADING
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
index 0378d24..7b474f6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
@@ -92,4 +92,6 @@ public interface YarnServiceConstants {
 
   String CONTENT = "content";
   String PRINCIPAL = "yarn.service.am.principal";
+
+  String UPGRADE_DIR = "upgrade";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
index 33e33a6..8152225 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java
@@ -35,8 +35,12 @@ import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
 import org.apache.hadoop.yarn.service.impl.pb.service.ClientAMProtocolPB;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
 
 public class ClientAMProtocolPBClientImpl
     implements ClientAMProtocol, Closeable {
@@ -88,4 +92,26 @@ public class ClientAMProtocolPBClientImpl
       RPC.stopProxy(this.proxy);
     }
   }
+
+  @Override
+  public UpgradeServiceResponseProto upgrade(
+      UpgradeServiceRequestProto request) throws IOException, YarnException {
+    try {
+      return proxy.upgradeService(null, request);
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+    }
+    return null;
+  }
+
+  @Override
+  public RestartServiceResponseProto restart(RestartServiceRequestProto 
request)
+      throws IOException, YarnException {
+    try {
+      return proxy.restartService(null, request);
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
index 7100781..1a1a1ef 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java
@@ -25,6 +25,10 @@ import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
 import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
 import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
+import 
org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
 import org.apache.hadoop.yarn.service.ClientAMProtocol;
 
 import java.io.IOException;
@@ -67,4 +71,24 @@ public class ClientAMProtocolPBServiceImpl implements 
ClientAMProtocolPB {
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public UpgradeServiceResponseProto upgradeService(RpcController controller,
+      UpgradeServiceRequestProto request) throws ServiceException {
+    try {
+      return real.upgrade(request);
+    } catch (IOException | YarnException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public RestartServiceResponseProto restartService(RpcController controller,
+      RestartServiceRequestProto request) throws ServiceException {
+    try {
+      return real.restart(request);
+    } catch (IOException | YarnException e) {
+      throw new ServiceException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java
index 284825e..5c2bac6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -112,11 +111,39 @@ public class CoreFileSystem {
   public Path buildClusterDirPath(String clustername) {
     Preconditions.checkNotNull(clustername);
     Path path = getBaseApplicationPath();
-    return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/" + 
clustername);
+    return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/"
+        + clustername);
   }
 
+  /**
+   * Build up the upgrade path string for a cluster. No attempt to
+   * create the directory is made.
+   *
+   * @param clusterName name of the cluster
+   * @param version version of the cluster
+   * @return the upgrade path to the cluster
+   */
+  public Path buildClusterUpgradeDirPath(String clusterName, String version) {
+    Preconditions.checkNotNull(clusterName);
+    Preconditions.checkNotNull(version);
+    return new Path(buildClusterDirPath(clusterName),
+        YarnServiceConstants.UPGRADE_DIR + "/" + version);
+  }
 
   /**
+   * Delete the upgrade cluster directory.
+   * @param clusterName name of the cluster
+   * @param version     version of the cluster
+   * @throws IOException
+   */
+  public void deleteClusterUpgradeDir(String clusterName, String version)
+      throws IOException {
+    Preconditions.checkNotNull(clusterName);
+    Preconditions.checkNotNull(version);
+    Path upgradeCluster = buildClusterUpgradeDirPath(clusterName, version);
+    fileSystem.delete(upgradeCluster, true);
+  }
+  /**
    * Build up the path string for keytab install location -no attempt to
    * create the directory is made
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
index 0591775..13d9a37 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
@@ -22,9 +22,9 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.registry.client.api.RegistryConstants;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
-import org.apache.hadoop.security.HadoopKerberosName;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Service;
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.service.api.records.Artifact;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.Configuration;
 import org.apache.hadoop.yarn.service.api.records.Resource;
+import org.apache.hadoop.yarn.service.exceptions.SliderException;
 import org.apache.hadoop.yarn.service.provider.AbstractClientProvider;
 import org.apache.hadoop.yarn.service.provider.ProviderFactory;
 import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
@@ -275,6 +276,14 @@ public class ServiceApiUtil {
     return jsonSerDeser.load(fs.getFileSystem(), serviceJson);
   }
 
+  public static Service loadServiceUpgrade(SliderFileSystem fs,
+      String serviceName, String version) throws IOException {
+    Path versionPath = fs.buildClusterUpgradeDirPath(serviceName, version);
+    Path versionedDef = new Path(versionPath, serviceName + ".json");
+    LOG.info("Loading service definition from {}", versionedDef);
+    return jsonSerDeser.load(fs.getFileSystem(), versionedDef);
+  }
+
   public static Service loadServiceFrom(SliderFileSystem fs,
       Path appDefPath) throws IOException {
     LOG.info("Loading service definition from " + appDefPath);
@@ -429,6 +438,23 @@ public class ServiceApiUtil {
     return sortByDependencies(components, sortedComponents);
   }
 
+  public static void createDirAndPersistApp(SliderFileSystem fs, Path appDir,
+      Service service)
+      throws IOException, SliderException {
+    FsPermission appDirPermission = new FsPermission("750");
+    fs.createWithPermissions(appDir, appDirPermission);
+    Path appJson = writeAppDefinition(fs, appDir, service);
+    LOG.info("Persisted service {} version {} at {}", service.getName(),
+        service.getVersion(), appJson);
+  }
+
+  public static Path writeAppDefinition(SliderFileSystem fs, Path appDir,
+      Service service) throws IOException {
+    Path appJson = new Path(appDir, service.getName() + ".json");
+    jsonSerDeser.save(fs.getFileSystem(), appJson, service, true);
+    return appJson;
+  }
+
   public static String $(String s) {
     return "${" + s +"}";
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
index 0a21c24..3677593 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto
@@ -26,6 +26,10 @@ service ClientAMProtocolService {
   rpc flexComponents(FlexComponentsRequestProto) returns 
(FlexComponentsResponseProto);
   rpc getStatus(GetStatusRequestProto) returns (GetStatusResponseProto);
   rpc stop(StopRequestProto) returns (StopResponseProto);
+  rpc upgradeService(UpgradeServiceRequestProto)
+    returns (UpgradeServiceResponseProto);
+  rpc restartService(RestartServiceRequestProto)
+    returns (RestartServiceResponseProto);
 }
 
 message FlexComponentsRequestProto {
@@ -37,7 +41,7 @@ message ComponentCountProto {
   optional int64 numberOfContainers = 2;
 }
 
-message FlexComponentsResponseProto{
+message FlexComponentsResponseProto {
 }
 
 message GetStatusRequestProto {
@@ -53,4 +57,17 @@ message StopRequestProto {
 
 message StopResponseProto {
 
+}
+
+message UpgradeServiceRequestProto {
+  optional string version = 1;
+}
+
+message UpgradeServiceResponseProto {
+}
+
+message RestartServiceRequestProto {
+}
+
+message RestartServiceResponseProto {
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
index 9933211..8347eb3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.service;
 
+import com.google.common.base.Throwables;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingCluster;
 import org.apache.hadoop.conf.Configuration;
@@ -26,18 +27,23 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.http.HttpServer2;
-import org.apache.hadoop.yarn.service.api.records.Service;
-import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.Resource;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
+import org.apache.hadoop.yarn.service.exceptions.SliderException;
 import org.apache.hadoop.yarn.service.utils.JsonSerDeser;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
 import org.codehaus.jackson.map.PropertyNamingStrategy;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +53,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URL;
+import java.nio.file.Paths;
+import java.util.Map;
 
 import static 
org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM;
 import static 
org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC;
@@ -78,7 +86,7 @@ public class ServiceTestUtils {
 
   // Example service definition
   // 2 components, each of which has 2 containers.
-  protected Service createExampleApplication() {
+  public static Service createExampleApplication() {
     Service exampleApp = new Service();
     exampleApp.setName("example-app");
     exampleApp.setVersion("v1");
@@ -176,7 +184,7 @@ public class ServiceTestUtils {
     zkCluster.start();
     conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
     conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString());
-    LOG.info("ZK cluster: " +  zkCluster.getConnectString());
+    LOG.info("ZK cluster: " + zkCluster.getConnectString());
 
     fs = FileSystem.get(conf);
     basedir = new File("target", "apps");
@@ -268,4 +276,78 @@ public class ServiceTestUtils {
     }
   }
 
+  /**
+   * Creates a {@link ServiceClient} for test purposes.
+   */
+  public static ServiceClient createClient(Configuration conf)
+      throws Exception {
+    ServiceClient client = new ServiceClient() {
+      @Override
+      protected Path addJarResource(String appName,
+          Map<String, LocalResource> localResources)
+          throws IOException, SliderException {
+        // do nothing, the Unit test will use local jars
+        return null;
+      }
+    };
+    client.init(conf);
+    client.start();
+    return client;
+  }
+
+
+  /**
+   * Watcher to initialize yarn service base path under target and deletes the
+   * the test directory when finishes.
+   */
+  public static class ServiceFSWatcher extends TestWatcher {
+    private YarnConfiguration conf;
+    private SliderFileSystem fs;
+    private java.nio.file.Path serviceBasePath;
+
+    @Override
+    protected void starting(Description description) {
+      conf = new YarnConfiguration();
+      delete(description);
+      serviceBasePath = Paths.get("target",
+          description.getClassName(), description.getMethodName());
+      conf.set(YARN_SERVICE_BASE_PATH, serviceBasePath.toString());
+      try {
+        fs = new SliderFileSystem(conf);
+      } catch (IOException e) {
+        Throwables.propagate(e);
+      }
+    }
+
+    @Override
+    protected void finished(Description description) {
+      delete(description);
+    }
+
+    private void delete(Description description) {
+      FileUtils.deleteQuietly(Paths.get("target",
+          description.getClassName()).toFile());
+    }
+
+    /**
+     * Returns the yarn conf.
+     */
+    public YarnConfiguration getConf() {
+      return conf;
+    }
+
+    /**
+     * Returns the file system.
+     */
+    public SliderFileSystem getFs() {
+      return fs;
+    }
+
+    /**
+     * Returns the test service base path.
+     */
+    public java.nio.file.Path getServiceBasePath() {
+      return serviceBasePath;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestDefaultUpgradeComponentsFinder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestDefaultUpgradeComponentsFinder.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestDefaultUpgradeComponentsFinder.java
new file mode 100644
index 0000000..086fdbe
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestDefaultUpgradeComponentsFinder.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.yarn.service.api.records.Component;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link UpgradeComponentsFinder.DefaultUpgradeComponentsFinder}.
+ */
+public class TestDefaultUpgradeComponentsFinder {
+
+  private UpgradeComponentsFinder.DefaultUpgradeComponentsFinder finder =
+      new UpgradeComponentsFinder.DefaultUpgradeComponentsFinder();
+
+  @Test
+  public void testServiceArtifactChange() {
+    Service currentDef = ServiceTestUtils.createExampleApplication();
+    Service targetDef =  ServiceTestUtils.createExampleApplication();
+    targetDef.getComponents().forEach(x -> x.setArtifact(
+        TestServiceManager.createTestArtifact("v1")));
+
+    Assert.assertEquals("all components need upgrade",
+        targetDef.getComponents(), finder.findTargetComponentSpecs(currentDef,
+            targetDef));
+  }
+
+  @Test
+  public void testComponentArtifactChange() {
+    Service currentDef = TestServiceManager.createBaseDef("test");
+    Service targetDef =  TestServiceManager.createBaseDef("test");
+
+    targetDef.getComponents().get(0).setArtifact(
+        TestServiceManager.createTestArtifact("v2"));
+
+    List<Component> expected = new ArrayList<>();
+    expected.add(targetDef.getComponents().get(0));
+
+    Assert.assertEquals("single components needs upgrade",
+        expected, finder.findTargetComponentSpecs(currentDef,
+            targetDef));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
new file mode 100644
index 0000000..c65a5d4
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.service.api.records.Artifact;
+import org.apache.hadoop.yarn.service.api.records.ComponentState;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.exceptions.SliderException;
+import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link ServiceManager}.
+ */
+public class TestServiceManager {
+
+  @Rule
+  public ServiceTestUtils.ServiceFSWatcher rule =
+      new ServiceTestUtils.ServiceFSWatcher();
+
+  @Test
+  public void testUpgrade() throws IOException, SliderException {
+    ServiceManager serviceManager = createTestServiceManager("testUpgrade");
+    upgrade(serviceManager, "v2", false);
+    Assert.assertEquals("service not upgraded", ServiceState.UPGRADING,
+        serviceManager.getServiceSpec().getState());
+  }
+
+  @Test
+  public void testRestartNothingToUpgrade()
+      throws IOException, SliderException {
+    ServiceManager serviceManager = createTestServiceManager("testRestart");
+    upgrade(serviceManager, "v2", false);
+
+    //make components stable
+    serviceManager.getServiceSpec().getComponents().forEach(comp -> {
+      comp.setState(ComponentState.STABLE);
+    });
+    serviceManager.handle(new ServiceEvent(ServiceEventType.START));
+    Assert.assertEquals("service not re-started", ServiceState.STABLE,
+        serviceManager.getServiceSpec().getState());
+  }
+
+  @Test
+  public void testRestartWithPendingUpgrade()
+      throws IOException, SliderException {
+    ServiceManager serviceManager = createTestServiceManager("testRestart");
+    upgrade(serviceManager, "v2", true);
+    serviceManager.handle(new ServiceEvent(ServiceEventType.START));
+    Assert.assertEquals("service should still be upgrading",
+        ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
+  }
+
+
+  private void upgrade(ServiceManager service, String version,
+      boolean upgradeArtifact)
+      throws IOException, SliderException {
+    Service upgradedDef = ServiceTestUtils.createExampleApplication();
+    upgradedDef.setName(service.getName());
+    upgradedDef.setVersion(version);
+    if (upgradeArtifact) {
+      Artifact upgradedArtifact = createTestArtifact("2");
+      upgradedDef.getComponents().forEach(component -> {
+        component.setArtifact(upgradedArtifact);
+      });
+    }
+    writeUpgradedDef(upgradedDef);
+    ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
+    upgradeEvent.setVersion("v2");
+    service.handle(upgradeEvent);
+  }
+
+  private ServiceManager createTestServiceManager(String name)
+      throws IOException {
+    ServiceContext context = new ServiceContext();
+    context.service = createBaseDef(name);
+    context.fs = rule.getFs();
+
+    context.scheduler = new ServiceScheduler(context) {
+      @Override
+      protected YarnRegistryViewForProviders createYarnRegistryOperations(
+          ServiceContext context, RegistryOperations registryClient) {
+        return mock(YarnRegistryViewForProviders.class);
+      }
+    };
+
+    context.scheduler.init(rule.getConf());
+
+    Map<String, org.apache.hadoop.yarn.service.component.Component>
+        componentState = context.scheduler.getAllComponents();
+    context.service.getComponents().forEach(component -> {
+      componentState.put(component.getName(),
+          new org.apache.hadoop.yarn.service.component.Component(component,
+              1L, context));
+    });
+    return new ServiceManager(context);
+  }
+
+  static Service createBaseDef(String name) {
+    ApplicationId applicationId = ApplicationId.newInstance(
+        System.currentTimeMillis(), 1);
+    Service serviceDef = ServiceTestUtils.createExampleApplication();
+    serviceDef.setId(applicationId.toString());
+    serviceDef.setName(name);
+    serviceDef.setState(ServiceState.STARTED);
+    Artifact artifact = createTestArtifact("1");
+
+    serviceDef.getComponents().forEach(component ->
+        component.setArtifact(artifact));
+    return serviceDef;
+  }
+
+  static Artifact createTestArtifact(String artifactId) {
+    Artifact artifact = new Artifact();
+    artifact.setId(artifactId);
+    artifact.setType(Artifact.TypeEnum.TARBALL);
+    return artifact;
+  }
+
+  private void writeUpgradedDef(Service upgradedDef)
+      throws IOException, SliderException {
+    Path upgradePath = rule.getFs().buildClusterUpgradeDirPath(
+        upgradedDef.getName(), upgradedDef.getVersion());
+    ServiceApiUtil.createDirAndPersistApp(rule.getFs(), upgradePath,
+        upgradedDef);
+  }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to