YARN-8665. Added Yarn service cancel upgrade option. Contributed by Chandni Singh
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/913f87da Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/913f87da Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/913f87da Branch: refs/heads/HDDS-4 Commit: 913f87dada27776c539dfb352400ecf8d40e7943 Parents: e0ff8e2 Author: Eric Yang <ey...@apache.org> Authored: Wed Sep 26 14:51:35 2018 -0400 Committer: Eric Yang <ey...@apache.org> Committed: Wed Sep 26 14:51:35 2018 -0400 ---------------------------------------------------------------------- .../yarn/service/client/ApiServiceClient.java | 20 ++ .../hadoop/yarn/service/webapp/ApiServer.java | 34 +- .../hadoop/yarn/service/ClientAMProtocol.java | 5 + .../hadoop/yarn/service/ClientAMService.java | 12 + .../hadoop/yarn/service/ServiceEvent.java | 14 +- .../hadoop/yarn/service/ServiceEventType.java | 3 +- .../hadoop/yarn/service/ServiceManager.java | 331 +++++++++++++------ .../service/api/records/ContainerState.java | 2 +- .../yarn/service/api/records/ServiceState.java | 7 +- .../yarn/service/client/ServiceClient.java | 21 ++ .../yarn/service/component/Component.java | 275 ++++++++++----- .../yarn/service/component/ComponentEvent.java | 10 - .../service/component/ComponentEventType.java | 1 + .../yarn/service/component/ComponentState.java | 3 +- .../component/instance/ComponentInstance.java | 269 ++++++++++++--- .../instance/ComponentInstanceEventType.java | 3 +- .../instance/ComponentInstanceState.java | 3 +- .../containerlaunch/ContainerLaunchService.java | 4 +- .../pb/client/ClientAMProtocolPBClientImpl.java | 13 + .../service/ClientAMProtocolPBServiceImpl.java | 13 + .../yarn/service/provider/ProviderUtils.java | 9 +- .../yarn/service/utils/ServiceApiUtil.java | 14 +- .../yarn/service/utils/SliderFileSystem.java | 49 +++ .../src/main/proto/ClientAMProtocol.proto | 8 + .../yarn/service/MockRunningServiceContext.java | 20 +- .../hadoop/yarn/service/ServiceTestUtils.java | 2 +- .../hadoop/yarn/service/TestServiceManager.java | 136 ++++++-- .../yarn/service/TestYarnNativeServices.java | 44 +++ .../yarn/service/client/TestServiceCLI.java | 17 + .../yarn/service/component/TestComponent.java | 239 +++++++++++-- .../instance/TestComponentInstance.java | 177 +++++++++- .../src/test/resources/log4j.properties | 19 ++ .../hadoop/yarn/client/cli/ApplicationCLI.java | 12 +- .../hadoop/yarn/client/cli/TestYarnCLI.java | 2 + .../hadoop/yarn/client/api/AppAdminClient.java | 13 + .../containermanager/ContainerManagerImpl.java | 1 + 36 files changed, 1470 insertions(+), 335 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index ca6cc50..b7a1541 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -723,4 +723,24 @@ public class ApiServiceClient extends AppAdminClient { } return null; } + + @Override + public int actionCancelUpgrade( + String appName) throws IOException, YarnException { + int result; + try { + Service service = new Service(); + service.setName(appName); + service.setState(ServiceState.CANCEL_UPGRADING); + String buffer = jsonSerDeser.toJson(service); + LOG.info("Cancel upgrade in progress. Please wait.."); + ClientResponse response = getApiClient(getServicePath(appName)) + .put(ClientResponse.class, buffer); + result = processResponse(response); + } catch (Exception e) { + LOG.error("Failed to cancel upgrade: ", e); + result = EXIT_EXCEPTION_THROWN; + } + return result; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index cd6f0d7..c4e3317 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -58,6 +58,7 @@ import java.util.*; import java.util.stream.Collectors; import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED; +import static org.apache.hadoop.yarn.service.api.records.ServiceState.CANCEL_UPGRADING; import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*; import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes.*; @@ -445,6 +446,12 @@ public class ApiServer { return upgradeService(updateServiceData, ugi); } + // If CANCEL_UPGRADING is requested + if (updateServiceData.getState() != null && + updateServiceData.getState() == CANCEL_UPGRADING) { + return cancelUpgradeService(appName, ugi); + } + // If new lifetime value specified then update it if (updateServiceData.getLifetime() != null && updateServiceData.getLifetime() > 0) { @@ -460,8 +467,7 @@ public class ApiServer { LOG.error(message, e); return formatResponse(Status.NOT_FOUND, e.getMessage()); } catch (YarnException e) { - String message = "Service is not found in hdfs: " + appName; - LOG.error(message, e); + LOG.error(e.getMessage(), e); return formatResponse(Status.NOT_FOUND, e.getMessage()); } catch (Exception e) { String message = "Error while performing operation for app: " + appName; @@ -707,6 +713,27 @@ public class ApiServer { return formatResponse(Status.ACCEPTED, status); } + private Response cancelUpgradeService(String serviceName, + final UserGroupInformation ugi) throws IOException, InterruptedException { + int result = ugi.doAs((PrivilegedExceptionAction<Integer>) () -> { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + int exitCode = sc.actionCancelUpgrade(serviceName); + sc.close(); + return exitCode; + }); + if (result == EXIT_SUCCESS) { + ServiceStatus status = new ServiceStatus(); + LOG.info("Service {} cancelling upgrade", serviceName); + status.setDiagnostics("Service " + serviceName + + " cancelling upgrade."); + status.setState(ServiceState.ACCEPTED); + return formatResponse(Status.ACCEPTED, status); + } + return Response.status(Status.BAD_REQUEST).build(); + } + private Response processComponentsUpgrade(UserGroupInformation ugi, String serviceName, Set<String> compNames) throws YarnException, IOException, InterruptedException { @@ -734,7 +761,8 @@ public class ApiServer { Service service, List<Container> containers) throws YarnException, IOException, InterruptedException { - if (service.getState() != ServiceState.UPGRADING) { + if (!service.getState().equals(ServiceState.UPGRADING) && + !service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) { throw new YarnException( String.format("The upgrade of service %s has not been initiated.", service.getName())); http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java index 652a314..39e7dfa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; @@ -60,4 +62,7 @@ public interface ClientAMProtocol { GetCompInstancesResponseProto getCompInstances( GetCompInstancesRequestProto request) throws IOException, YarnException; + + CancelUpgradeResponseProto cancelUpgrade( + CancelUpgradeRequestProto request) throws IOException, YarnException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index 2ef8f7e..47e9829 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; @@ -208,4 +210,14 @@ public class ClientAMService extends AbstractService ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(containers.toArray( new Container[containers.size()]))).build(); } + + @Override + public CancelUpgradeResponseProto cancelUpgrade( + CancelUpgradeRequestProto request) throws IOException, YarnException { + LOG.info("Cancel service upgrade by {}", + UserGroupInformation.getCurrentUser()); + ServiceEvent event = new ServiceEvent(ServiceEventType.CANCEL_UPGRADE); + context.scheduler.getDispatcher().getEventHandler().handle(event); + return CancelUpgradeResponseProto.newBuilder().build(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java index 3a55472..cf4455d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java @@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.service; import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.hadoop.yarn.service.api.records.Component; -import java.util.Queue; +import java.util.List; /** * Events are handled by {@link ServiceManager} to manage the service @@ -33,7 +33,8 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> { private String version; private boolean autoFinalize; private boolean expressUpgrade; - private Queue<Component> compsToUpgradeInOrder; + // For express upgrade they should be in order. + private List<Component> compsToUpgrade; public ServiceEvent(ServiceEventType serviceEventType) { super(serviceEventType); @@ -71,13 +72,12 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> { return this; } - public Queue<Component> getCompsToUpgradeInOrder() { - return compsToUpgradeInOrder; + public List<Component> getCompsToUpgrade() { + return compsToUpgrade; } - public ServiceEvent setCompsToUpgradeInOrder( - Queue<Component> compsToUpgradeInOrder) { - this.compsToUpgradeInOrder = compsToUpgradeInOrder; + public ServiceEvent setCompsToUpgrade(List<Component> compsToUpgrade) { + this.compsToUpgrade = compsToUpgrade; return this; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java index 4fc420b..03afdd3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java @@ -24,5 +24,6 @@ package org.apache.hadoop.yarn.service; public enum ServiceEventType { START, UPGRADE, - CHECK_STABLE + CHECK_STABLE, + CANCEL_UPGRADE } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java index 04454b1..4851325 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -20,32 +20,31 @@ package org.apache.hadoop.yarn.service; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; -import org.apache.hadoop.yarn.state.InvalidStateTransitionException; -import org.apache.hadoop.yarn.state.MultipleArcTransition; -import org.apache.hadoop.yarn.state.StateMachine; -import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.apache.hadoop.yarn.state.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser; @@ -71,8 +70,10 @@ public class ServiceManager implements EventHandler<ServiceEvent> { private final SliderFileSystem fs; private String upgradeVersion; - private Queue<org.apache.hadoop.yarn.service.api.records - .Component> compsToUpgradeInOrder; + private List<org.apache.hadoop.yarn.service.api.records + .Component> componentsToUpgrade; + private List<String> compsAffectedByUpgrade = new ArrayList<>(); + private String cancelledVersion; private static final StateMachineFactory<ServiceManager, State, ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY = @@ -88,11 +89,14 @@ public class ServiceManager implements EventHandler<ServiceEvent> { .addTransition(State.UPGRADING, EnumSet.of(State.STABLE, State.UPGRADING), ServiceEventType.START, - new CheckStableTransition()) + new StartFromUpgradeTransition()) .addTransition(State.UPGRADING, EnumSet.of(State.STABLE, State.UPGRADING), ServiceEventType.CHECK_STABLE, new CheckStableTransition()) + + .addTransition(State.UPGRADING, State.UPGRADING, + ServiceEventType.CANCEL_UPGRADE, new CancelUpgradeTransition()) .installTopology(); public ServiceManager(ServiceContext context) { @@ -148,19 +152,25 @@ public class ServiceManager implements EventHandler<ServiceEvent> { public State transition(ServiceManager serviceManager, ServiceEvent event) { serviceManager.upgradeVersion = event.getVersion(); + serviceManager.componentsToUpgrade = event.getCompsToUpgrade(); + event.getCompsToUpgrade().forEach(comp -> + serviceManager.compsAffectedByUpgrade.add(comp.getName())); try { if (event.isExpressUpgrade()) { - serviceManager.serviceSpec.setState(ServiceState.EXPRESS_UPGRADING); - serviceManager.compsToUpgradeInOrder = event - .getCompsToUpgradeInOrder(); - serviceManager.upgradeNextCompIfAny(); + serviceManager.dispatchNeedUpgradeEvents(false); + serviceManager.upgradeNextCompIfAny(false); + } else { + serviceManager.dispatchNeedUpgradeEvents(false); + } + + if (event.isExpressUpgrade()) { + serviceManager.setServiceState(ServiceState.EXPRESS_UPGRADING); } else if (event.isAutoFinalize()) { - serviceManager.serviceSpec.setState(ServiceState - .UPGRADING_AUTO_FINALIZE); + serviceManager.setServiceState(ServiceState.UPGRADING_AUTO_FINALIZE); } else { - serviceManager.serviceSpec.setState( - ServiceState.UPGRADING); + serviceManager.setServiceState(ServiceState.UPGRADING); } + return State.UPGRADING; } catch (Throwable e) { LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(), @@ -181,24 +191,32 @@ public class ServiceManager implements EventHandler<ServiceEvent> { if (currState.equals(ServiceState.STABLE)) { return State.STABLE; } - if (currState.equals(ServiceState.EXPRESS_UPGRADING)) { - org.apache.hadoop.yarn.service.api.records.Component component = - serviceManager.compsToUpgradeInOrder.peek(); - if (!component.getState().equals(ComponentState.NEEDS_UPGRADE) && - !component.getState().equals(ComponentState.UPGRADING)) { - serviceManager.compsToUpgradeInOrder.remove(); + if (currState.equals(ServiceState.EXPRESS_UPGRADING) || + currState.equals(ServiceState.CANCEL_UPGRADING)) { + + if (!serviceManager.componentsToUpgrade.isEmpty()) { + org.apache.hadoop.yarn.service.api.records.Component compSpec = + serviceManager.componentsToUpgrade.get(0); + Component component = serviceManager.scheduler.getAllComponents() + .get(compSpec.getName()); + + if (!component.isUpgrading()) { + serviceManager.componentsToUpgrade.remove(0); + serviceManager.upgradeNextCompIfAny( + currState.equals(ServiceState.CANCEL_UPGRADING)); + } } - serviceManager.upgradeNextCompIfAny(); } + if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) || - event.getType().equals(ServiceEventType.START) || - (currState.equals(ServiceState.EXPRESS_UPGRADING) && - serviceManager.compsToUpgradeInOrder.isEmpty())) { + ((currState.equals(ServiceState.EXPRESS_UPGRADING) || + currState.equals(ServiceState.CANCEL_UPGRADING)) && + serviceManager.componentsToUpgrade.isEmpty())) { + ServiceState targetState = checkIfStable(serviceManager.serviceSpec); if (targetState.equals(ServiceState.STABLE)) { - if (serviceManager.finalizeUpgrade()) { - LOG.info("Service def state changed from {} -> {}", currState, - serviceManager.serviceSpec.getState()); + if (serviceManager.finalizeUpgrade( + currState.equals(ServiceState.CANCEL_UPGRADING))) { return State.STABLE; } } @@ -207,52 +225,149 @@ public class ServiceManager implements EventHandler<ServiceEvent> { } } - private void upgradeNextCompIfAny() { - if (!compsToUpgradeInOrder.isEmpty()) { + private static class StartFromUpgradeTransition implements + MultipleArcTransition<ServiceManager, ServiceEvent, State> { + + @Override + public State transition(ServiceManager serviceManager, ServiceEvent event) { + ServiceState currState = serviceManager.serviceSpec.getState(); + + ServiceState targetState = checkIfStable(serviceManager.serviceSpec); + if (targetState.equals(ServiceState.STABLE)) { + if (serviceManager.finalizeUpgrade( + currState.equals(ServiceState.CANCEL_UPGRADING))) { + return State.STABLE; + } + } + return State.UPGRADING; + } + } + + private static class CancelUpgradeTransition implements + SingleArcTransition<ServiceManager, ServiceEvent> { + + @Override + public void transition(ServiceManager serviceManager, + ServiceEvent event) { + if (!serviceManager.getState().equals(State.UPGRADING)) { + LOG.info("[SERVICE]: Cannot cancel the upgrade in {} state", + serviceManager.getState()); + return; + } + try { + Service targetSpec = ServiceApiUtil.loadService( + serviceManager.context.fs, serviceManager.getName()); + + Service sourceSpec = ServiceApiUtil.loadServiceUpgrade( + serviceManager.context.fs, serviceManager.getName(), + serviceManager.upgradeVersion); + serviceManager.cancelledVersion = serviceManager.upgradeVersion; + LOG.info("[SERVICE] cancel version {}", + serviceManager.cancelledVersion); + serviceManager.upgradeVersion = serviceManager.serviceSpec.getVersion(); + serviceManager.componentsToUpgrade = serviceManager + .resolveCompsToUpgrade(sourceSpec, targetSpec); + + serviceManager.compsAffectedByUpgrade.clear(); + serviceManager.componentsToUpgrade.forEach(comp -> + serviceManager.compsAffectedByUpgrade.add(comp.getName())); + + serviceManager.dispatchNeedUpgradeEvents(true); + serviceManager.upgradeNextCompIfAny(true); + serviceManager.setServiceState(ServiceState.CANCEL_UPGRADING); + } catch (Throwable e) { + LOG.error("[SERVICE]: Cancellation of upgrade failed", e); + } + } + } + + private void upgradeNextCompIfAny(boolean cancelUpgrade) { + if (!componentsToUpgrade.isEmpty()) { org.apache.hadoop.yarn.service.api.records.Component component = - compsToUpgradeInOrder.peek(); + componentsToUpgrade.get(0); + + serviceSpec.getComponent(component.getName()).getContainers().forEach( + container -> { + ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( + ContainerId.fromString(container.getId()), + !cancelUpgrade ? ComponentInstanceEventType.UPGRADE : + ComponentInstanceEventType.CANCEL_UPGRADE); + LOG.info("Upgrade container {} {}", container.getId(), + cancelUpgrade); + dispatcher.getEventHandler().handle(upgradeEvent); + }); + } + } - ComponentEvent needUpgradeEvent = new ComponentEvent( - component.getName(), ComponentEventType.UPGRADE).setTargetSpec( - component).setUpgradeVersion(upgradeVersion).setExpressUpgrade(true); - context.scheduler.getDispatcher().getEventHandler().handle( - needUpgradeEvent); + private void dispatchNeedUpgradeEvents(boolean cancelUpgrade) { + if (componentsToUpgrade != null) { + componentsToUpgrade.forEach(component -> { + ComponentEvent needUpgradeEvent = new ComponentEvent( + component.getName(), !cancelUpgrade ? ComponentEventType.UPGRADE : + ComponentEventType.CANCEL_UPGRADE) + .setTargetSpec(component) + .setUpgradeVersion(upgradeVersion); + LOG.info("Upgrade component {} {}", component.getName(), cancelUpgrade); + context.scheduler.getDispatcher().getEventHandler() + .handle(needUpgradeEvent); + }); } } /** * @return whether finalization of upgrade was successful. */ - private boolean finalizeUpgrade() { - try { - // save the application id and state to - Service targetSpec = ServiceApiUtil.loadServiceUpgrade( - fs, getName(), upgradeVersion); - targetSpec.setId(serviceSpec.getId()); - targetSpec.setState(ServiceState.STABLE); - Map<String, Component> allComps = scheduler.getAllComponents(); - targetSpec.getComponents().forEach(compSpec -> { - Component comp = allComps.get(compSpec.getName()); - compSpec.setState(comp.getComponentSpec().getState()); - }); - jsonSerDeser.save(fs.getFileSystem(), - ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true); - fs.deleteClusterUpgradeDir(getName(), upgradeVersion); - } catch (IOException e) { - LOG.error("Upgrade did not complete because unable to re-write the" + - " service definition", e); - return false; + private boolean finalizeUpgrade(boolean cancelUpgrade) { + if (!cancelUpgrade) { + try { + // save the application id and state to + Service targetSpec = ServiceApiUtil.loadServiceUpgrade( + fs, getName(), upgradeVersion); + targetSpec.setId(serviceSpec.getId()); + targetSpec.setState(ServiceState.STABLE); + Map<String, Component> allComps = scheduler.getAllComponents(); + targetSpec.getComponents().forEach(compSpec -> { + Component comp = allComps.get(compSpec.getName()); + compSpec.setState(comp.getComponentSpec().getState()); + }); + jsonSerDeser.save(fs.getFileSystem(), + ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true); + } catch (IOException e) { + LOG.error("Upgrade did not complete because unable to re-write the" + + " service definition", e); + return false; + } } try { - fs.deleteClusterUpgradeDir(getName(), upgradeVersion); + String upgradeVersionToDel = cancelUpgrade? cancelledVersion : + upgradeVersion; + LOG.info("[SERVICE]: delete upgrade dir version {}", upgradeVersionToDel); + fs.deleteClusterUpgradeDir(getName(), upgradeVersionToDel); + + for (String comp : compsAffectedByUpgrade) { + String compDirVersionToDel = cancelUpgrade? cancelledVersion : + serviceSpec.getVersion(); + LOG.info("[SERVICE]: delete {} dir version {}", comp, + compDirVersionToDel); + fs.deleteComponentDir(compDirVersionToDel, comp); + } + + if (cancelUpgrade) { + fs.deleteComponentsVersionDirIfEmpty(cancelledVersion); + } else { + fs.deleteComponentsVersionDirIfEmpty(serviceSpec.getVersion()); + } + } catch (IOException e) { LOG.warn("Unable to delete upgrade definition for service {} " + "version {}", getName(), upgradeVersion); } - serviceSpec.setState(ServiceState.STABLE); + setServiceState(ServiceState.STABLE); serviceSpec.setVersion(upgradeVersion); upgradeVersion = null; + cancelledVersion = null; + compsAffectedByUpgrade.clear(); return true; } @@ -291,30 +406,8 @@ public class ServiceManager implements EventHandler<ServiceEvent> { context.fs, context.service.getName(), upgradeVersion); List<org.apache.hadoop.yarn.service.api.records.Component> - compsNeedUpgradeList = componentsFinder. - findTargetComponentSpecs(context.service, targetSpec); - - // remove all components from need upgrade list if there restart policy - // doesn't all upgrade. - if (compsNeedUpgradeList != null) { - compsNeedUpgradeList.removeIf(component -> { - org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum - restartPolicy = component.getRestartPolicy(); - - final ComponentRestartPolicy restartPolicyHandler = - Component.getRestartPolicyHandler(restartPolicy); - // Do not allow upgrades for components which have NEVER/ON_FAILURE - // restart policy - if (!restartPolicyHandler.allowUpgrades()) { - LOG.info("The component {} has a restart policy that doesnt " + - "allow upgrades {} ", component.getName(), - component.getRestartPolicy().toString()); - return true; - } - - return false; - }); - } + compsNeedUpgradeList = resolveCompsToUpgrade(context.service, + targetSpec); ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE) .setVersion(upgradeVersion) @@ -334,7 +427,7 @@ public class ServiceManager implements EventHandler<ServiceEvent> { List<String> resolvedComps = ServiceApiUtil .resolveCompsDependency(targetSpec); - Queue<org.apache.hadoop.yarn.service.api.records.Component> + List<org.apache.hadoop.yarn.service.api.records.Component> orderedCompUpgrade = new LinkedList<>(); resolvedComps.forEach(compName -> { org.apache.hadoop.yarn.service.api.records.Component component = @@ -343,30 +436,68 @@ public class ServiceManager implements EventHandler<ServiceEvent> { orderedCompUpgrade.add(component); } }); - event.setCompsToUpgradeInOrder(orderedCompUpgrade); + event.setCompsToUpgrade(orderedCompUpgrade); + } else { + event.setCompsToUpgrade(compsNeedUpgradeList); } + context.scheduler.getDispatcher().getEventHandler().handle( + event); - context.scheduler.getDispatcher().getEventHandler().handle(event); - - if (compsNeedUpgradeList != null && !compsNeedUpgradeList.isEmpty()) { - if (!expressUpgrade) { - compsNeedUpgradeList.forEach(component -> { - ComponentEvent needUpgradeEvent = new ComponentEvent( - component.getName(), ComponentEventType.UPGRADE).setTargetSpec( - component).setUpgradeVersion(event.getVersion()); - context.scheduler.getDispatcher().getEventHandler().handle( - needUpgradeEvent); - - }); - } - } else if (autoFinalize) { - // nothing to upgrade if upgrade auto finalize is requested, trigger a + if (autoFinalize && (compsNeedUpgradeList == null || + compsNeedUpgradeList.isEmpty())) { + // nothing to upgrade and auto finalize is requested, trigger a // state check. context.scheduler.getDispatcher().getEventHandler().handle( new ServiceEvent(ServiceEventType.CHECK_STABLE)); } } + private List<org.apache.hadoop.yarn.service.api.records.Component> + resolveCompsToUpgrade(Service sourceSpec, Service targetSpec) { + + List<org.apache.hadoop.yarn.service.api.records.Component> + compsNeedUpgradeList = componentsFinder. + findTargetComponentSpecs(sourceSpec, targetSpec); + + // remove all components from need upgrade list if there restart policy + // doesn't all upgrade. + if (compsNeedUpgradeList != null) { + compsNeedUpgradeList.removeIf(component -> { + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum + restartPolicy = component.getRestartPolicy(); + + final ComponentRestartPolicy restartPolicyHandler = + Component.getRestartPolicyHandler(restartPolicy); + // Do not allow upgrades for components which have NEVER/ON_FAILURE + // restart policy + if (!restartPolicyHandler.allowUpgrades()) { + LOG.info("The component {} has a restart policy that doesnt " + + "allow upgrades {} ", component.getName(), + component.getRestartPolicy().toString()); + return true; + } + + return false; + }); + } + + return compsNeedUpgradeList; + } + + /** + * Sets the state of the service in the service spec. + * @param state service state + */ + private void setServiceState( + org.apache.hadoop.yarn.service.api.records.ServiceState state) { + org.apache.hadoop.yarn.service.api.records.ServiceState curState = + serviceSpec.getState(); + if (!curState.equals(state)) { + serviceSpec.setState(state); + LOG.info("[SERVICE] spec state changed from {} -> {}", curState, state); + } + } + /** * Returns the name of the service. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java index cac527a..a6e9a2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ContainerState.java @@ -27,5 +27,5 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable public enum ContainerState { RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING, SUCCEEDED, - FAILED; + FAILED, FAILED_UPGRADE; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java index 49c1985..3f2f4f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java @@ -29,5 +29,10 @@ import org.apache.hadoop.classification.InterfaceStability; @ApiModel(description = "The current state of an service.") public enum ServiceState { ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING, - UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED; + UPGRADING_AUTO_FINALIZE, EXPRESS_UPGRADING, SUCCEEDED, CANCEL_UPGRADING; + + public static boolean isUpgrading(ServiceState state) { + return state.equals(UPGRADING) || state.equals(UPGRADING_AUTO_FINALIZE) + || state.equals(EXPRESS_UPGRADING); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index a27ed87..23db57e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.client.util.YarnClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; @@ -353,6 +354,26 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, } @Override + public int actionCancelUpgrade(String appName) throws IOException, + YarnException { + Service liveService = getStatus(appName); + if (liveService == null || + !ServiceState.isUpgrading(liveService.getState())) { + throw new YarnException("Service " + appName + " is not upgrading, " + + "so nothing to cancel."); + } + + ApplicationReport appReport = yarnClient.getApplicationReport( + getAppId(appName)); + if (StringUtils.isEmpty(appReport.getHost())) { + throw new YarnException(appName + " AM hostname is empty"); + } + ClientAMProtocol proxy = createAMProxy(appName, appReport); + proxy.cancelUpgrade(CancelUpgradeRequestProto.newBuilder().build()); + return EXIT_SUCCESS; + } + + @Override public int actionCleanUp(String appName, String userName) throws IOException, YarnException { if (cleanUpRegistry(appName, userName)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index acf3404..526bde0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.service.component; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import static org.apache.hadoop.yarn.service.api.records.Component @@ -44,7 +43,6 @@ import org.apache.hadoop.yarn.service.ServiceEventType; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.ResourceInformation; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; -import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId; import org.apache.hadoop.yarn.service.ContainerFailureTracker; import org.apache.hadoop.yarn.service.ServiceContext; @@ -89,6 +87,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.*; import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*; import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; +import static org.apache.hadoop.yarn.service.component.ComponentEventType.CANCEL_UPGRADE; +import static org.apache.hadoop.yarn.service.component.ComponentEventType.UPGRADE; import static org.apache.hadoop.yarn.service.component.ComponentState.*; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.*; import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*; @@ -126,9 +126,8 @@ public class Component implements EventHandler<ComponentEvent> { new ConcurrentHashMap<>(); private boolean healthThresholdMonitorEnabled = false; - private AtomicBoolean upgradeInProgress = new AtomicBoolean(false); - private ComponentEvent upgradeEvent; - private AtomicLong numContainersThatNeedUpgrade = new AtomicLong(0); + private UpgradeStatus upgradeStatus = new UpgradeStatus(); + private UpgradeStatus cancelUpgradeStatus = new UpgradeStatus(); private StateMachine<ComponentState, ComponentEventType, ComponentEvent> stateMachine; @@ -160,6 +159,8 @@ public class Component implements EventHandler<ComponentEvent> { // Flex while previous flex is still in progress .addTransition(FLEXING, EnumSet.of(FLEXING, STABLE), FLEX, new FlexComponentTransition()) + .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE), + CHECK_STABLE, new CheckStableTransition()) // container failed while stable .addTransition(STABLE, FLEXING, CONTAINER_COMPLETED, @@ -172,19 +173,28 @@ public class Component implements EventHandler<ComponentEvent> { // For flex down, go to STABLE state .addTransition(STABLE, EnumSet.of(STABLE, FLEXING), FLEX, new FlexComponentTransition()) - .addTransition(STABLE, UPGRADING, ComponentEventType.UPGRADE, - new ComponentNeedsUpgradeTransition()) - //Upgrade while previous upgrade is still in progress - .addTransition(UPGRADING, UPGRADING, ComponentEventType.UPGRADE, - new ComponentNeedsUpgradeTransition()) - .addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE), - CHECK_STABLE, new CheckStableTransition()) - .addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE), - CHECK_STABLE, new CheckStableTransition()) + .addTransition(STABLE, UPGRADING, UPGRADE, + new NeedsUpgradeTransition()) + .addTransition(STABLE, CANCEL_UPGRADING, CANCEL_UPGRADE, + new NeedsUpgradeTransition()) .addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE, new CheckStableTransition()) - .addTransition(UPGRADING, FLEXING, CONTAINER_COMPLETED, - new ContainerCompletedTransition()) + + // Cancel upgrade while previous upgrade is still in progress + .addTransition(UPGRADING, CANCEL_UPGRADING, + CANCEL_UPGRADE, new NeedsUpgradeTransition()) + .addTransition(UPGRADING, EnumSet.of(UPGRADING, STABLE), + CHECK_STABLE, new CheckStableTransition()) + .addTransition(UPGRADING, UPGRADING, CONTAINER_COMPLETED, + new CompletedAfterUpgradeTransition()) + + .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, FLEXING, + STABLE), CHECK_STABLE, new CheckStableTransition()) + .addTransition(CANCEL_UPGRADING, CANCEL_UPGRADING, + CONTAINER_COMPLETED, new CompletedAfterUpgradeTransition()) + .addTransition(CANCEL_UPGRADING, FLEXING, CONTAINER_ALLOCATED, + new ContainerAllocatedTransition()) + .installTopology(); public Component( @@ -332,7 +342,7 @@ public class Component implements EventHandler<ComponentEvent> { + before + " to " + event.getDesired()); component.requestContainers(delta); component.createNumCompInstances(delta); - component.componentSpec.setState( + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); component.getScheduler().getApp().setState(ServiceState.STARTED); return FLEXING; @@ -430,11 +440,11 @@ public class Component implements EventHandler<ComponentEvent> { if (component.getNumRunningInstances() + component .getNumSucceededInstances() + component.getNumFailedInstances() < component.getComponentSpec().getNumberOfContainers()) { - component.componentSpec.setState( + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); return FLEXING; } else{ - component.componentSpec.setState( + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); return STABLE; } @@ -444,22 +454,22 @@ public class Component implements EventHandler<ComponentEvent> { Component component) { // if desired == running if (component.componentMetrics.containersReady.value() == component - .getComponentSpec().getNumberOfContainers() - && component.numContainersThatNeedUpgrade.get() == 0) { - component.componentSpec.setState( + .getComponentSpec().getNumberOfContainers() && + !component.doesNeedUpgrade()) { + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); return STABLE; + } else if (component.doesNeedUpgrade()) { + component.setComponentState(org.apache.hadoop.yarn.service.api.records. + ComponentState.NEEDS_UPGRADE); + return component.getState(); } else if (component.componentMetrics.containersReady.value() != component .getComponentSpec().getNumberOfContainers()) { - component.componentSpec.setState( + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); return FLEXING; - } else { - // component.numContainersThatNeedUpgrade.get() > 0 - component.componentSpec.setState(org.apache.hadoop.yarn.service.api. - records.ComponentState.NEEDS_UPGRADE); - return UPGRADING; } + return component.getState(); } // This method should be called whenever there is an increment or decrement @@ -467,22 +477,16 @@ public class Component implements EventHandler<ComponentEvent> { //This should not matter for terminating components private static synchronized void checkAndUpdateComponentState( Component component, boolean isIncrement) { - org.apache.hadoop.yarn.service.api.records.ComponentState curState = - component.componentSpec.getState(); if (component.getRestartPolicyHandler().isLongLived()) { if (isIncrement) { // check if all containers are in READY state - if (component.numContainersThatNeedUpgrade.get() == 0 - && component.componentMetrics.containersReady.value() - == component.componentMetrics.containersDesired.value()) { - component.componentSpec.setState( + if (!component.upgradeStatus.areContainersUpgrading() && + !component.cancelUpgradeStatus.areContainersUpgrading() && + component.componentMetrics.containersReady.value() == + component.componentMetrics.containersDesired.value()) { + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); - if (curState != component.componentSpec.getState()) { - LOG.info("[COMPONENT {}] state changed from {} -> {}", - component.componentSpec.getName(), curState, - component.componentSpec.getState()); - } // component state change will trigger re-check of service state component.context.getServiceManager().checkAndUpdateServiceState(); } @@ -491,19 +495,14 @@ public class Component implements EventHandler<ComponentEvent> { // still need to verify the count before changing the component state if (component.componentMetrics.containersReady.value() < component.componentMetrics.containersDesired.value()) { - component.componentSpec.setState( + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState .FLEXING); } else if (component.componentMetrics.containersReady.value() == component.componentMetrics.containersDesired.value()) { - component.componentSpec.setState( + component.setComponentState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); } - if (curState != component.componentSpec.getState()) { - LOG.info("[COMPONENT {}] state changed from {} -> {}", - component.componentSpec.getName(), curState, - component.componentSpec.getState()); - } // component state change will trigger re-check of service state component.context.getServiceManager().checkAndUpdateServiceState(); } @@ -511,8 +510,8 @@ public class Component implements EventHandler<ComponentEvent> { // component state change will trigger re-check of service state component.context.getServiceManager().checkAndUpdateServiceState(); } - // when the service is stable then the state of component needs to - // transition to stable + // triggers the state machine in component to reach appropriate state + // once the state in spec is changed. component.dispatcher.getEventHandler().handle( new ComponentEvent(component.getName(), ComponentEventType.CHECK_STABLE)); @@ -544,25 +543,43 @@ public class Component implements EventHandler<ComponentEvent> { } } - private static class ComponentNeedsUpgradeTransition extends BaseTransition { + private static class CompletedAfterUpgradeTransition extends BaseTransition { @Override public void transition(Component component, ComponentEvent event) { - component.upgradeInProgress.set(true); - component.upgradeEvent = event; - component.componentSpec.setState(org.apache.hadoop.yarn.service.api. - records.ComponentState.NEEDS_UPGRADE); - component.numContainersThatNeedUpgrade.set( + Preconditions.checkNotNull(event.getContainerId()); + component.updateMetrics(event.getStatus()); + component.dispatcher.getEventHandler().handle( + new ComponentInstanceEvent(event.getContainerId(), STOP) + .setStatus(event.getStatus())); + } + } + + private static class NeedsUpgradeTransition extends BaseTransition { + @Override + public void transition(Component component, ComponentEvent event) { + boolean isCancel = event.getType().equals(CANCEL_UPGRADE); + UpgradeStatus status = !isCancel ? component.upgradeStatus : + component.cancelUpgradeStatus; + + status.inProgress.set(true); + status.targetSpec = event.getTargetSpec(); + status.targetVersion = event.getUpgradeVersion(); + LOG.info("[COMPONENT {}]: need upgrade to {}", + component.getName(), status.targetVersion); + + status.containersNeedUpgrade.set( component.componentSpec.getNumberOfContainers()); - component.componentSpec.getContainers().forEach(container -> { - container.setState(ContainerState.NEEDS_UPGRADE); - if (event.isExpressUpgrade()) { - ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( - ContainerId.fromString(container.getId()), - ComponentInstanceEventType.UPGRADE); - LOG.info("Upgrade container {}", container.getId()); - component.dispatcher.getEventHandler().handle(upgradeEvent); - } + + component.setComponentState(org.apache.hadoop.yarn.service.api. + records.ComponentState.NEEDS_UPGRADE); + + component.getAllComponentInstances().forEach(instance -> { + instance.setContainerState(ContainerState.NEEDS_UPGRADE); }); + + if (event.getType().equals(CANCEL_UPGRADE)) { + component.upgradeStatus.reset(); + } } } @@ -572,22 +589,22 @@ public class Component implements EventHandler<ComponentEvent> { @Override public ComponentState transition(Component component, ComponentEvent componentEvent) { - org.apache.hadoop.yarn.service.api.records.ComponentState currState = - component.componentSpec.getState(); - if (currState.equals(org.apache.hadoop.yarn.service.api.records - .ComponentState.STABLE)) { - return ComponentState.STABLE; - } // checkIfStable also updates the state in definition when STABLE ComponentState targetState = checkIfStable(component); - if (targetState.equals(STABLE) && component.upgradeInProgress.get()) { - component.componentSpec.overwrite( - component.upgradeEvent.getTargetSpec()); - component.upgradeEvent = null; + + if (targetState.equals(STABLE) && + !(component.upgradeStatus.isCompleted() && + component.cancelUpgradeStatus.isCompleted())) { + // Component stable after upgrade or cancel upgrade + UpgradeStatus status = !component.cancelUpgradeStatus.isCompleted() ? + component.cancelUpgradeStatus : component.upgradeStatus; + + component.componentSpec.overwrite(status.getTargetSpec()); + status.reset(); + ServiceEvent checkStable = new ServiceEvent(ServiceEventType. CHECK_STABLE); component.dispatcher.getEventHandler().handle(checkStable); - component.upgradeInProgress.set(false); } return targetState; } @@ -625,11 +642,14 @@ public class Component implements EventHandler<ComponentEvent> { "[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ", getName(), container.getId(), instance.getCompInstanceName(), container.getNodeId()); - if (upgradeInProgress.get()) { + if (!(upgradeStatus.isCompleted() && cancelUpgradeStatus.isCompleted())) { + UpgradeStatus status = !cancelUpgradeStatus.isCompleted() ? + cancelUpgradeStatus : upgradeStatus; + scheduler.getContainerLaunchService() .launchCompInstance(scheduler.getApp(), instance, container, - createLaunchContext(upgradeEvent.getTargetSpec(), - upgradeEvent.getUpgradeVersion())); + createLaunchContext(status.getTargetSpec(), + status.getTargetVersion())); } else { scheduler.getContainerLaunchService().launchCompInstance( scheduler.getApp(), instance, container, @@ -830,6 +850,12 @@ public class Component implements EventHandler<ComponentEvent> { } } + private boolean doesNeedUpgrade() { + return cancelUpgradeStatus.areContainersUpgrading() || + upgradeStatus.areContainersUpgrading() || + upgradeStatus.failed.get(); + } + public boolean areDependenciesReady() { List<String> dependencies = componentSpec.getDependencies(); if (ServiceUtils.isEmpty(dependencies)) { @@ -911,10 +937,6 @@ public class Component implements EventHandler<ComponentEvent> { } } - public void decContainersThatNeedUpgrade() { - numContainersThatNeedUpgrade.decrementAndGet(); - } - public int getNumReadyInstances() { return componentMetrics.containersReady.value(); } @@ -972,10 +994,33 @@ public class Component implements EventHandler<ComponentEvent> { } } - public ComponentEvent getUpgradeEvent() { + /** + * Returns whether a component is upgrading or not. + */ + public boolean isUpgrading() { + this.readLock.lock(); + + try { + return !(upgradeStatus.isCompleted() && + cancelUpgradeStatus.isCompleted()); + } finally { + this.readLock.unlock(); + } + } + + public UpgradeStatus getUpgradeStatus() { + this.readLock.lock(); + try { + return upgradeStatus; + } finally { + this.readLock.unlock(); + } + } + + public UpgradeStatus getCancelUpgradeStatus() { this.readLock.lock(); try { - return upgradeEvent; + return cancelUpgradeStatus; } finally { this.readLock.unlock(); } @@ -1013,6 +1058,70 @@ public class Component implements EventHandler<ComponentEvent> { } } + /** + * Sets the state of the component in the component spec. + * @param state component state + */ + private void setComponentState( + org.apache.hadoop.yarn.service.api.records.ComponentState state) { + org.apache.hadoop.yarn.service.api.records.ComponentState curState = + componentSpec.getState(); + if (!curState.equals(state)) { + componentSpec.setState(state); + LOG.info("[COMPONENT {}] spec state changed from {} -> {}", + componentSpec.getName(), curState, state); + } + } + + /** + * Status of upgrade. + */ + public static class UpgradeStatus { + private org.apache.hadoop.yarn.service.api.records.Component targetSpec; + private String targetVersion; + private AtomicBoolean inProgress = new AtomicBoolean(false); + private AtomicLong containersNeedUpgrade = new AtomicLong(0); + private AtomicBoolean failed = new AtomicBoolean(false); + + public org.apache.hadoop.yarn.service.api.records. + Component getTargetSpec() { + return targetSpec; + } + + public String getTargetVersion() { + return targetVersion; + } + + /* + * @return whether the upgrade is completed or not + */ + public boolean isCompleted() { + return !inProgress.get(); + } + + public void decContainersThatNeedUpgrade() { + if (inProgress.get()) { + containersNeedUpgrade.decrementAndGet(); + } + } + + public void containerFailedUpgrade() { + failed.set(true); + } + + void reset() { + containersNeedUpgrade.set(0); + targetSpec = null; + targetVersion = null; + inProgress.set(false); + failed.set(false); + } + + boolean areContainersUpgrading() { + return containersNeedUpgrade.get() != 0; + } + } + public ServiceContext getContext() { return context; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java index 643961d..84caa77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java @@ -35,7 +35,6 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> { private ContainerId containerId; private org.apache.hadoop.yarn.service.api.records.Component targetSpec; private String upgradeVersion; - private boolean expressUpgrade; public ContainerId getContainerId() { return containerId; @@ -114,13 +113,4 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> { this.upgradeVersion = upgradeVersion; return this; } - - public boolean isExpressUpgrade() { - return expressUpgrade; - } - - public ComponentEvent setExpressUpgrade(boolean expressUpgrade) { - this.expressUpgrade = expressUpgrade; - return this; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java index 44d781f..d211f49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java @@ -24,6 +24,7 @@ public enum ComponentEventType { CONTAINER_RECOVERED, CONTAINER_STARTED, CONTAINER_COMPLETED, + CANCEL_UPGRADE, UPGRADE, CHECK_STABLE } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java index 0f63d03..e1cd0c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java @@ -22,5 +22,6 @@ public enum ComponentState { INIT, FLEXING, STABLE, - UPGRADING + UPGRADING, + CANCEL_UPGRADING } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index afd8c67..89c9a22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; +import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -64,8 +65,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.MessageFormat; import java.util.Date; +import java.util.EnumSet; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -84,8 +87,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, LoggerFactory.getLogger(ComponentInstance.class); private static final String FAILED_BEFORE_LAUNCH_DIAG = "failed before launch"; + private static final String UPGRADE_FAILED = "upgrade failed"; - private StateMachine<ComponentInstanceState, ComponentInstanceEventType, + private StateMachine<ComponentInstanceState, ComponentInstanceEventType, ComponentInstanceEvent> stateMachine; private Component component; private final ReadLock readLock; @@ -106,7 +110,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, // This container object is used for rest API query private org.apache.hadoop.yarn.service.api.records.Container containerSpec; private String serviceVersion; - + private AtomicBoolean upgradeInProgress = new AtomicBoolean(false); + private boolean pendingCancelUpgrade = false; private static final StateMachineFactory<ComponentInstance, ComponentInstanceState, ComponentInstanceEventType, @@ -132,13 +137,23 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, .addTransition(READY, STARTED, BECOME_NOT_READY, new ContainerBecomeNotReadyTransition()) .addTransition(READY, INIT, STOP, new ContainerStoppedTransition()) - .addTransition(READY, UPGRADING, UPGRADE, - new ContainerUpgradeTransition()) - .addTransition(UPGRADING, UPGRADING, UPGRADE, - new ContainerUpgradeTransition()) - .addTransition(UPGRADING, READY, BECOME_READY, - new ContainerBecomeReadyTransition()) - .addTransition(UPGRADING, INIT, STOP, new ContainerStoppedTransition()) + .addTransition(READY, UPGRADING, UPGRADE, new UpgradeTransition()) + .addTransition(READY, EnumSet.of(READY, CANCEL_UPGRADING), CANCEL_UPGRADE, + new CancelUpgradeTransition()) + + // FROM UPGRADING + .addTransition(UPGRADING, EnumSet.of(READY, CANCEL_UPGRADING), + CANCEL_UPGRADE, new CancelUpgradeTransition()) + .addTransition(UPGRADING, EnumSet.of(READY), BECOME_READY, + new ReadyAfterUpgradeTransition()) + .addTransition(UPGRADING, UPGRADING, STOP, + new StoppedAfterUpgradeTransition()) + + // FROM CANCEL_UPGRADING + .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, READY), + BECOME_READY, new ReadyAfterUpgradeTransition()) + .addTransition(CANCEL_UPGRADING, EnumSet.of(CANCEL_UPGRADING, INIT), + STOP, new StoppedAfterCancelUpgradeTransition()) .installTopology(); public ComponentInstance(Component component, @@ -217,24 +232,53 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { - compInstance.containerSpec.setState(ContainerState.READY); - if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) { - compInstance.component.incContainersReady(false); - compInstance.component.decContainersThatNeedUpgrade(); - compInstance.serviceVersion = compInstance.component.getUpgradeEvent() - .getUpgradeVersion(); - ComponentEvent checkState = new ComponentEvent( - compInstance.component.getName(), ComponentEventType.CHECK_STABLE); - compInstance.scheduler.getDispatcher().getEventHandler().handle( - checkState); + compInstance.setContainerState(ContainerState.READY); + compInstance.component.incContainersReady(true); + compInstance.postContainerReady(); + } + } - } else { - compInstance.component.incContainersReady(true); - } - if (compInstance.timelineServiceEnabled) { - compInstance.serviceTimelinePublisher - .componentInstanceBecomeReady(compInstance.containerSpec); + private static class ReadyAfterUpgradeTransition implements + MultipleArcTransition<ComponentInstance, ComponentInstanceEvent, + ComponentInstanceState> { + + @Override + public ComponentInstanceState transition(ComponentInstance instance, + ComponentInstanceEvent event) { + + if (instance.pendingCancelUpgrade) { + // cancellation of upgrade was triggered before the upgrade was + // finished. + LOG.info("{} received ready but cancellation pending", + event.getContainerId()); + instance.upgradeInProgress.set(true); + instance.cancelUpgrade(); + instance.pendingCancelUpgrade = false; + return instance.getState(); } + + instance.upgradeInProgress.set(false); + instance.setContainerState(ContainerState.READY); + instance.component.incContainersReady(false); + + Component.UpgradeStatus status = instance.getState().equals(UPGRADING) ? + instance.component.getUpgradeStatus() : + instance.component.getCancelUpgradeStatus(); + status.decContainersThatNeedUpgrade(); + + instance.serviceVersion = status.getTargetVersion(); + ComponentEvent checkState = new ComponentEvent( + instance.component.getName(), + ComponentEventType.CHECK_STABLE); + instance.scheduler.getDispatcher().getEventHandler().handle(checkState); + instance.postContainerReady(); + return ComponentInstanceState.READY; + } + } + + private void postContainerReady() { + if (timelineServiceEnabled) { + serviceTimelinePublisher.componentInstanceBecomeReady(containerSpec); } } @@ -242,7 +286,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { - compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY); + compInstance.setContainerState(ContainerState.RUNNING_BUT_UNREADY); compInstance.component.decContainersReady(true); } } @@ -276,11 +320,13 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, " completed. Reinsert back to pending list and requested "); builder.append("a new container.").append(System.lineSeparator()); builder.append(" exitStatus=").append( - failureBeforeLaunch ? null : event.getStatus().getExitStatus()); + failureBeforeLaunch || event.getStatus() == null ? null : + event.getStatus().getExitStatus()); builder.append(", diagnostics="); builder.append(failureBeforeLaunch ? FAILED_BEFORE_LAUNCH_DIAG : - event.getStatus().getDiagnostics()); + (event.getStatus() != null ? event.getStatus().getDiagnostics() : + UPGRADE_FAILED)); if (event.getStatus() != null && event.getStatus().getExitStatus() != 0) { LOG.error(builder.toString()); @@ -342,15 +388,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, ComponentInstanceEvent event) { Component comp = compInstance.component; + ContainerStatus status = event.getStatus(); + // status is not available when upgrade fails String containerDiag = compInstance.getCompInstanceId() + ": " + ( - failedBeforeLaunching ? - FAILED_BEFORE_LAUNCH_DIAG : - event.getStatus().getDiagnostics()); + failedBeforeLaunching ? FAILED_BEFORE_LAUNCH_DIAG : + (status != null ? status.getDiagnostics() : UPGRADE_FAILED)); compInstance.diagnostics.append(containerDiag + System.lineSeparator()); compInstance.cancelContainerStatusRetriever(); - if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) { - compInstance.component.decContainersThatNeedUpgrade(); - } + if (compInstance.getState().equals(READY)) { compInstance.component.decContainersReady(true); } @@ -387,10 +432,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, // record in ATS compInstance.scheduler.getServiceTimelinePublisher() .componentInstanceFinished(compInstance.getContainer().getId(), - failedBeforeLaunching ? - -1 : - event.getStatus().getExitStatus(), ContainerState.FAILED, - containerDiag); + failedBeforeLaunching || status == null ? -1 : + status.getExitStatus(), + ContainerState.FAILED, containerDiag); // mark other component-instances/containers as STOPPED for (ContainerId containerId : scheduler.getLiveInstances() @@ -449,28 +493,129 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, .equals(state) || ContainerState.SUCCEEDED.equals(state); } - private static class ContainerUpgradeTransition extends BaseTransition { + private static class StoppedAfterUpgradeTransition extends + BaseTransition { @Override - public void transition(ComponentInstance compInstance, + public void transition(ComponentInstance instance, ComponentInstanceEvent event) { - if (!compInstance.containerSpec.getState().equals( - ContainerState.NEEDS_UPGRADE)) { - //nothing to upgrade. this may happen with express upgrade. + instance.component.getUpgradeStatus().decContainersThatNeedUpgrade(); + instance.component.decRunningContainers(); + + final ServiceScheduler scheduler = instance.component.getScheduler(); + scheduler.getAmRMClient().releaseAssignedContainer( + event.getContainerId()); + instance.scheduler.executorService.submit( + () -> instance.cleanupRegistry(event.getContainerId())); + scheduler.removeLiveCompInstance(event.getContainerId()); + instance.component.getUpgradeStatus().containerFailedUpgrade(); + instance.setContainerState(ContainerState.FAILED_UPGRADE); + instance.upgradeInProgress.set(false); + } + } + + private static class StoppedAfterCancelUpgradeTransition implements + MultipleArcTransition<ComponentInstance, ComponentInstanceEvent, + ComponentInstanceState> { + + private ContainerStoppedTransition stoppedTransition = + new ContainerStoppedTransition(); + + @Override + public ComponentInstanceState transition(ComponentInstance instance, + ComponentInstanceEvent event) { + if (instance.pendingCancelUpgrade) { + // cancellation of upgrade was triggered before the upgrade was + // finished. + LOG.info("{} received stopped but cancellation pending", + event.getContainerId()); + instance.upgradeInProgress.set(true); + instance.cancelUpgrade(); + instance.pendingCancelUpgrade = false; + return instance.getState(); + } + + // When upgrade is cancelled, and container re-init fails + instance.component.getCancelUpgradeStatus() + .decContainersThatNeedUpgrade(); + instance.upgradeInProgress.set(false); + stoppedTransition.transition(instance, event); + return ComponentInstanceState.INIT; + } + } + + private static class UpgradeTransition extends BaseTransition { + + @Override + public void transition(ComponentInstance instance, + ComponentInstanceEvent event) { + if (!instance.component.getCancelUpgradeStatus().isCompleted()) { + // last check to see if cancellation was triggered. The component may + // have processed the cancel upgrade event but the instance doesn't know + // it yet. If cancellation has been triggered then no point in + // upgrading. return; } - compInstance.containerSpec.setState(ContainerState.UPGRADING); - compInstance.component.decContainersReady(false); - ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent(); - compInstance.scheduler.getContainerLaunchService() - .reInitCompInstance(compInstance.scheduler.getApp(), compInstance, - compInstance.container, - compInstance.component.createLaunchContext( - upgradeEvent.getTargetSpec(), - upgradeEvent.getUpgradeVersion())); + instance.upgradeInProgress.set(true); + instance.setContainerState(ContainerState.UPGRADING); + instance.component.decContainersReady(false); + + Component.UpgradeStatus status = instance.component.getUpgradeStatus(); + instance.scheduler.getContainerLaunchService() + .reInitCompInstance(instance.scheduler.getApp(), instance, + instance.container, + instance.component.createLaunchContext( + status.getTargetSpec(), + status.getTargetVersion())); } } + private static class CancelUpgradeTransition implements + MultipleArcTransition<ComponentInstance, ComponentInstanceEvent, + ComponentInstanceState> { + + @Override + public ComponentInstanceState transition(ComponentInstance instance, + ComponentInstanceEvent event) { + if (instance.upgradeInProgress.compareAndSet(false, true)) { + + Component.UpgradeStatus cancelStatus = instance.component + .getCancelUpgradeStatus(); + + if (instance.getServiceVersion().equals( + cancelStatus.getTargetVersion())) { + // previous upgrade didn't happen so just go back to READY + LOG.info("{} nothing to cancel", event.getContainerId()); + cancelStatus.decContainersThatNeedUpgrade(); + instance.setContainerState(ContainerState.READY); + ComponentEvent checkState = new ComponentEvent( + instance.component.getName(), ComponentEventType.CHECK_STABLE); + instance.scheduler.getDispatcher().getEventHandler() + .handle(checkState); + return ComponentInstanceState.READY; + } else { + instance.component.decContainersReady(false); + instance.cancelUpgrade(); + } + } else { + LOG.info("{} pending cancellation", event.getContainerId()); + instance.pendingCancelUpgrade = true; + } + return ComponentInstanceState.CANCEL_UPGRADING; + } + } + + private void cancelUpgrade() { + LOG.info("{} cancelling upgrade", container.getId()); + setContainerState(ContainerState.UPGRADING); + Component.UpgradeStatus cancelStatus = component.getCancelUpgradeStatus(); + scheduler.getContainerLaunchService() + .reInitCompInstance(scheduler.getApp(), this, + this.container, this.component.createLaunchContext( + cancelStatus.getTargetSpec(), + cancelStatus.getTargetVersion())); + } + public ComponentInstanceState getState() { this.readLock.lock(); @@ -505,6 +650,26 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, } } + /** + * Sets the state of the container in the container spec. It is write + * protected. + * + * @param state container state + */ + public void setContainerState(ContainerState state) { + this.writeLock.lock(); + try { + ContainerState curState = containerSpec.getState(); + if (!curState.equals(state)) { + containerSpec.setState(state); + LOG.info("{} spec state state changed from {} -> {}", + getCompInstanceId(), curState, state); + } + } finally { + this.writeLock.unlock(); + } + } + @Override public void handle(ComponentInstanceEvent event) { try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java index 665b8fa..b9181e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceEventType.java @@ -23,5 +23,6 @@ public enum ComponentInstanceEventType { STOP, BECOME_READY, BECOME_NOT_READY, - UPGRADE + UPGRADE, + CANCEL_UPGRADE } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java index f5de5cb..28cbcf5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstanceState.java @@ -22,5 +22,6 @@ public enum ComponentInstanceState { INIT, STARTED, READY, - UPGRADING + UPGRADING, + CANCEL_UPGRADING } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org