http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java index 3c856ec..153ab46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.component.ComponentEvent; @@ -113,7 +112,8 @@ public class ContainerLaunchService extends AbstractService{ .startContainerAsync(container, launcher.completeContainerLaunch()); } else { - LOG.info("reInitializing container {}", container.getId()); + LOG.info("reInitializing container {} with version {}", + container.getId(), componentLaunchContext.getServiceVersion()); instance.getComponent().getScheduler().getNmClient() .reInitializeContainerAsync(container.getId(), launcher.completeContainerLaunch(), true);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java index 49ecd2e..6f37967 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java @@ -30,6 +30,8 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; @@ -141,4 +143,15 @@ public class ClientAMProtocolPBClientImpl } return null; } + + @Override + public CancelUpgradeResponseProto cancelUpgrade( + CancelUpgradeRequestProto request) throws IOException, YarnException { + try { + return proxy.cancelUpgrade(null, request); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java index eab3f9f..071c357 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.service.impl.pb.service; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.CancelUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; @@ -116,4 +118,15 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB { throw new ServiceException(e); } } + + @Override + public CancelUpgradeResponseProto cancelUpgrade( + RpcController controller, CancelUpgradeRequestProto request) + throws ServiceException { + try { + return real.cancelUpgrade(request); + } catch (IOException | YarnException e) { + throw new ServiceException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java index ac90992..c12c340 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java @@ -167,9 +167,8 @@ public class ProviderUtils implements YarnServiceConstants { public static Path initCompInstanceDir(SliderFileSystem fs, ContainerLaunchService.ComponentLaunchContext compLaunchContext, ComponentInstance instance) { - Path compDir = new Path(new Path(fs.getAppDir(), "components"), - compLaunchContext.getServiceVersion() + "/" + - compLaunchContext.getName()); + Path compDir = fs.getComponentDir(compLaunchContext.getServiceVersion(), + compLaunchContext.getName()); Path compInstanceDir = new Path(compDir, instance.getCompInstanceName()); instance.setCompInstanceDir(compInstanceDir); return compInstanceDir; @@ -184,7 +183,9 @@ public class ProviderUtils implements YarnServiceConstants { ServiceContext context) throws IOException { Path compInstanceDir = initCompInstanceDir(fs, compLaunchContext, instance); if (!fs.getFileSystem().exists(compInstanceDir)) { - log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir); + log.info("{} version {} : Creating dir on hdfs: {}", + instance.getCompInstanceId(), compLaunchContext.getServiceVersion(), + compInstanceDir); fs.getFileSystem().mkdirs(compInstanceDir, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); } else { http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index b588e88..0eb54ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -603,7 +603,7 @@ public class ServiceApiUtil { public static void validateInstancesUpgrade(List<Container> liveContainers) throws YarnException { for (Container liveContainer : liveContainers) { - if (!liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) { + if (!isUpgradable(liveContainer)) { // Nothing to upgrade throw new YarnException(String.format( ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE, @@ -613,6 +613,16 @@ public class ServiceApiUtil { } /** + * Returns whether the container can be upgraded in the current state. + */ + public static boolean isUpgradable(Container container) { + + return container.getState() != null && + (container.getState().equals(ContainerState.NEEDS_UPGRADE) || + container.getState().equals(ContainerState.FAILED_UPGRADE)); + } + + /** * Validates the components that are requested to upgrade require an upgrade. * It returns the instances of the components which need upgrade. */ @@ -629,7 +639,7 @@ public class ServiceApiUtil { ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName())); } liveComp.getContainers().forEach(liveContainer -> { - if (liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) { + if (isUpgradable(liveContainer)) { containerNeedUpgrade.add(liveContainer); } }); http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java index d6d664e..c776476 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/SliderFileSystem.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.service.utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -48,4 +50,51 @@ public class SliderFileSystem extends CoreFileSystem { public Path getAppDir() { return this.appDir; } + + /** + * Returns the component directory path. + * + * @param serviceVersion service version + * @param compName component name + * @return component directory + */ + public Path getComponentDir(String serviceVersion, String compName) { + return new Path(new Path(getAppDir(), "components"), + serviceVersion + "/" + compName); + } + + /** + * Deletes the component directory. + * + * @param serviceVersion + * @param compName + * @throws IOException + */ + public void deleteComponentDir(String serviceVersion, String compName) + throws IOException { + Path path = getComponentDir(serviceVersion, compName); + if (fileSystem.exists(path)) { + fileSystem.delete(path, true); + LOG.debug("deleted dir {}", path); + } + } + + /** + * Deletes the components version directory. + * + * @param serviceVersion + * @throws IOException + */ + public void deleteComponentsVersionDirIfEmpty(String serviceVersion) + throws IOException { + Path path = new Path(new Path(getAppDir(), "components"), serviceVersion); + if (fileSystem.exists(path) && fileSystem.listStatus(path).length == 0) { + fileSystem.delete(path, true); + LOG.info("deleted dir {}", path); + } + } + + + private static final Logger LOG = LoggerFactory.getLogger( + SliderFileSystem.class); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto index 169f765..bcf893e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto @@ -28,6 +28,8 @@ service ClientAMProtocolService { rpc stop(StopRequestProto) returns (StopResponseProto); rpc upgradeService(UpgradeServiceRequestProto) returns (UpgradeServiceResponseProto); + rpc cancelUpgrade(CancelUpgradeRequestProto) + returns (CancelUpgradeResponseProto); rpc restartService(RestartServiceRequestProto) returns (RestartServiceResponseProto); rpc upgrade(CompInstancesUpgradeRequestProto) returns @@ -73,6 +75,12 @@ message UpgradeServiceResponseProto { optional string error = 1; } +message CancelUpgradeRequestProto { +} + +message CancelUpgradeResponseProto { +} + message RestartServiceRequestProto { } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java index 321b2cd..b685f4b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/MockRunningServiceContext.java @@ -94,21 +94,25 @@ public class MockRunningServiceContext extends ServiceContext { return mockLaunchService; } - @Override public ServiceUtils.ProcessTerminationHandler - getTerminationHandler() { + @Override + public ServiceUtils.ProcessTerminationHandler getTerminationHandler() { return new - ServiceUtils.ProcessTerminationHandler() { - public void terminate(int exitCode) { - } - }; + ServiceUtils.ProcessTerminationHandler() { + public void terminate(int exitCode) { + } + }; + } + + @Override + protected ServiceManager createServiceManager() { + return ServiceTestUtils.createServiceManager( + MockRunningServiceContext.this); } }; this.scheduler.init(fsWatcher.getConf()); - ServiceTestUtils.createServiceManager(this); - doNothing().when(mockLaunchService). reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject()); stabilizeComponents(this); http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 6b49ab0..58db752 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -384,6 +384,7 @@ public class ServiceTestUtils { conf.set(YARN_SERVICE_BASE_PATH, serviceBasePath.toString()); try { fs = new SliderFileSystem(conf); + fs.setAppDir(new Path(serviceBasePath.toString())); } catch (IOException e) { Throwables.propagate(e); } @@ -532,7 +533,6 @@ public class ServiceTestUtils { GenericTestUtils.waitFor(() -> { try { Service retrievedApp = client.getStatus(exampleApp.getName()); - System.out.println(retrievedApp); return retrievedApp.getState() == desiredState; } catch (Exception e) { e.printStackTrace(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java index a37cabe..406eea4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; @@ -65,7 +66,7 @@ public class TestServiceManager { initUpgrade(context, "v2", false, false, false); ServiceManager manager = context.getServiceManager(); //make components stable by upgrading all instances - upgradeAllInstances(context); + upgradeAndReadyAllInstances(context); context.scheduler.getDispatcher().getEventHandler().handle( new ServiceEvent(ServiceEventType.START)); @@ -83,7 +84,7 @@ public class TestServiceManager { initUpgrade(context, "v2", false, true, false); ServiceManager manager = context.getServiceManager(); //make components stable by upgrading all instances - upgradeAllInstances(context); + upgradeAndReadyAllInstances(context); GenericTestUtils.waitFor(()-> context.service.getState().equals(ServiceState.STABLE), @@ -115,7 +116,7 @@ public class TestServiceManager { manager.getServiceSpec().getState()); //make components stable by upgrading all instances - upgradeAllInstances(context); + upgradeAndReadyAllInstances(context); // finalize service context.scheduler.getDispatcher().getEventHandler().handle( @@ -138,7 +139,7 @@ public class TestServiceManager { initUpgrade(context, "v2", true, true, false); // make components stable - upgradeAllInstances(context); + upgradeAndReadyAllInstances(context); GenericTestUtils.waitFor(() -> context.service.getState().equals(ServiceState.STABLE), @@ -174,18 +175,17 @@ public class TestServiceManager { public void testExpressUpgrade() throws Exception { ServiceContext context = createServiceContext("testExpressUpgrade"); ServiceManager manager = context.getServiceManager(); - manager.getServiceSpec().setState( - ServiceState.EXPRESS_UPGRADING); + manager.getServiceSpec().setState(ServiceState.EXPRESS_UPGRADING); initUpgrade(context, "v2", true, true, true); List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service); - // wait till instances of first component are in upgrade - String comp1 = comps.get(0); - upgradeInstancesOf(context, comp1); + // wait till instances of first component are upgraded and ready + String compA = comps.get(0); + makeInstancesReadyAfterUpgrade(context, compA); - // wait till instances of second component are in upgrade - String comp2 = comps.get(1); - upgradeInstancesOf(context, comp2); + // wait till instances of second component are upgraded and ready + String compB = comps.get(1); + makeInstancesReadyAfterUpgrade(context, compB); GenericTestUtils.waitFor(() -> context.service.getState().equals(ServiceState.STABLE), @@ -196,6 +196,57 @@ public class TestServiceManager { validateUpgradeFinalization(manager.getName(), "v2"); } + @Test(timeout = TIMEOUT) + public void testCancelUpgrade() throws Exception { + ServiceContext context = createServiceContext("testCancelUpgrade"); + writeInitialDef(context.service); + initUpgrade(context, "v2", true, false, false); + ServiceManager manager = context.getServiceManager(); + Assert.assertEquals("service not upgrading", ServiceState.UPGRADING, + manager.getServiceSpec().getState()); + + List<String> comps = ServiceApiUtil.resolveCompsDependency(context.service); + // wait till instances of first component are upgraded and ready + String compA = comps.get(0); + // upgrade the instances + upgradeInstances(context, compA); + makeInstancesReadyAfterUpgrade(context, compA); + + // cancel upgrade + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.CANCEL_UPGRADE)); + makeInstancesReadyAfterUpgrade(context, compA); + + GenericTestUtils.waitFor(()-> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); + Assert.assertEquals("service upgrade not cancelled", ServiceState.STABLE, + manager.getServiceSpec().getState()); + + validateUpgradeFinalization(manager.getName(), "v1"); + } + + @Test(timeout = TIMEOUT) + public void testCancelUpgradeAfterInitiate() throws Exception { + ServiceContext context = createServiceContext("testCancelUpgrade"); + writeInitialDef(context.service); + initUpgrade(context, "v2", true, false, false); + ServiceManager manager = context.getServiceManager(); + Assert.assertEquals("service not upgrading", ServiceState.UPGRADING, + manager.getServiceSpec().getState()); + + // cancel upgrade + context.scheduler.getDispatcher().getEventHandler().handle( + new ServiceEvent(ServiceEventType.CANCEL_UPGRADE)); + GenericTestUtils.waitFor(()-> + context.service.getState().equals(ServiceState.STABLE), + CHECK_EVERY_MILLIS, TIMEOUT); + Assert.assertEquals("service upgrade not cancelled", ServiceState.STABLE, + manager.getServiceSpec().getState()); + + validateUpgradeFinalization(manager.getName(), "v1"); + } + private void validateUpgradeFinalization(String serviceName, String expectedVersion) throws IOException { Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName); @@ -225,21 +276,23 @@ public class TestServiceManager { } writeUpgradedDef(upgradedDef); serviceManager.processUpgradeRequest(version, autoFinalize, expressUpgrade); - ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE); - upgradeEvent.setVersion(version).setExpressUpgrade(expressUpgrade) - .setAutoFinalize(autoFinalize); - - GenericTestUtils.waitFor(()-> { - ServiceState serviceState = context.service.getState(); - if (serviceState.equals(ServiceState.UPGRADING) || - serviceState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) || - serviceState.equals(ServiceState.EXPRESS_UPGRADING)) { - return true; + GenericTestUtils.waitFor(() -> { + for (Component comp : context.scheduler.getAllComponents().values()) { + if (!comp.getComponentSpec().getState().equals( + ComponentState.NEEDS_UPGRADE)) { + return false; + } } - return false; + return true; }, CHECK_EVERY_MILLIS, TIMEOUT); } + private void upgradeAndReadyAllInstances(ServiceContext context) throws + TimeoutException, InterruptedException { + upgradeAllInstances(context); + makeAllInstancesReady(context); + } + private void upgradeAllInstances(ServiceContext context) throws TimeoutException, InterruptedException { // upgrade the instances @@ -248,8 +301,10 @@ public class TestServiceManager { ComponentInstanceEventType.UPGRADE); context.scheduler.getDispatcher().getEventHandler().handle(event); })); + } - // become ready + private void makeAllInstancesReady(ServiceContext context) + throws TimeoutException, InterruptedException { context.scheduler.getLiveInstances().forEach(((containerId, instance) -> { ComponentInstanceEvent event = new ComponentInstanceEvent(containerId, ComponentInstanceEventType.BECOME_READY); @@ -267,7 +322,19 @@ public class TestServiceManager { }, CHECK_EVERY_MILLIS, TIMEOUT); } - private void upgradeInstancesOf(ServiceContext context, String compName) + private void upgradeInstances(ServiceContext context, String compName) { + Collection<ComponentInstance> compInstances = context.scheduler + .getAllComponents().get(compName).getAllComponentInstances(); + compInstances.forEach(instance -> { + ComponentInstanceEvent event = new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.UPGRADE); + context.scheduler.getDispatcher().getEventHandler().handle(event); + }); + } + + private void makeInstancesReadyAfterUpgrade(ServiceContext context, + String compName) throws TimeoutException, InterruptedException { Collection<ComponentInstance> compInstances = context.scheduler .getAllComponents().get(compName).getAllComponentInstances(); @@ -289,6 +356,15 @@ public class TestServiceManager { context.scheduler.getDispatcher().getEventHandler().handle(event); }); + + GenericTestUtils.waitFor(() -> { + for (ComponentInstance instance : compInstances) { + if (!instance.getContainerState().equals(ContainerState.READY)) { + return false; + } + } + return true; + }, CHECK_EVERY_MILLIS, TIMEOUT); } private ServiceContext createServiceContext(String name) @@ -324,6 +400,14 @@ public class TestServiceManager { return artifact; } + private void writeInitialDef(Service service) + throws IOException, SliderException { + Path servicePath = rule.getFs().buildClusterDirPath( + service.getName()); + ServiceApiUtil.createDirAndPersistApp(rule.getFs(), servicePath, + service); + } + private void writeUpgradedDef(Service upgradedDef) throws IOException, SliderException { Path upgradePath = rule.getFs().buildClusterUpgradeDirPath( @@ -332,6 +416,6 @@ public class TestServiceManager { upgradedDef); } - private static final int TIMEOUT = 200000; + private static final int TIMEOUT = 10000; private static final int CHECK_EVERY_MILLIS = 100; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index 216d88f..3e23a10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service; +import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; @@ -450,6 +451,49 @@ public class TestYarnNativeServices extends ServiceTestUtils { client.actionDestroy(service.getName()); } + @Test(timeout = 200000) + public void testCancelUpgrade() throws Exception { + setupInternal(NUM_NMS); + getConf().setBoolean(YARN_SERVICE_UPGRADE_ENABLED, true); + ServiceClient client = createClient(getConf()); + + Service service = createExampleApplication(); + Component component = service.getComponents().iterator().next(); + component.getConfiguration().getEnv().put("key1", "val0"); + + client.actionCreate(service); + waitForServiceToBeStable(client, service); + + // upgrade the service + service.setState(ServiceState.UPGRADING); + service.setVersion("v2"); + component.getConfiguration().getEnv().put("key1", "val1"); + client.initiateUpgrade(service); + + // wait for service to be in upgrade state + waitForServiceToBeInState(client, service, ServiceState.UPGRADING); + + // upgrade 1 container + Service liveService = client.getStatus(service.getName()); + Container container = liveService.getComponent(component.getName()) + .getContainers().iterator().next(); + client.actionUpgrade(service, Lists.newArrayList(container)); + + Thread.sleep(500); + // cancel the upgrade + client.actionCancelUpgrade(service.getName()); + waitForServiceToBeStable(client, service); + Service active = client.getStatus(service.getName()); + Assert.assertEquals("component not stable", ComponentState.STABLE, + active.getComponent(component.getName()).getState()); + Assert.assertEquals("comp does not have new env", "val0", + active.getComponent(component.getName()).getConfiguration() + .getEnv("key1")); + LOG.info("Stop/destroy service {}", service); + client.actionStop(service.getName(), true); + client.actionDestroy(service.getName()); + } + // Test to verify ANTI_AFFINITY placement policy // 1. Start mini cluster with 3 NMs and scheduler placement-constraint handler // 2. Create an example service with 3 containers http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java index 0e047c2..41be8c7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/client/TestServiceCLI.java @@ -221,6 +221,17 @@ public class TestServiceCLI { Assert.assertEquals(result, 0); } + @Test + public void testCancelUpgrade() throws Exception { + conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE, + DummyServiceClient.class.getName()); + cli.setConf(conf); + String[] args = {"app", "-upgrade", "app-1", + "-cancel", "-appTypes", DUMMY_APP_TYPE}; + int result = cli.run(ApplicationCLI.preProcessArgs(args)); + Assert.assertEquals(result, 0); + } + @Test (timeout = 180000) public void testEnableFastLaunch() throws Exception { fs.getFileSystem().create(new Path(basedir.getAbsolutePath(), "test.jar")) @@ -332,5 +343,11 @@ public class TestServiceCLI { throws IOException, YarnException { return ""; } + + @Override + public int actionCancelUpgrade(String appName) throws IOException, + YarnException { + return 0; + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java index e1a4d9d..f11d871 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -19,8 +19,8 @@ package org.apache.hadoop.yarn.service.component; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.service.MockRunningServiceContext; import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.ServiceTestUtils; import org.apache.hadoop.yarn.service.TestServiceManager; @@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; -import org.apache.hadoop.yarn.service.MockRunningServiceContext; import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Rule; @@ -38,9 +37,8 @@ import org.junit.Test; import java.util.Iterator; +import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.BECOME_READY; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Tests for {@link Component}. @@ -78,14 +76,14 @@ public class TestComponent { "val1")).setUpgradeVersion("v2")); // one instance finished upgrading - comp.decContainersThatNeedUpgrade(); + comp.getUpgradeStatus().decContainersThatNeedUpgrade(); comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.CHECK_STABLE)); Assert.assertEquals("component not in need upgrade state", ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); // second instance finished upgrading - comp.decContainersThatNeedUpgrade(); + comp.getUpgradeStatus().decContainersThatNeedUpgrade(); comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.CHECK_STABLE)); @@ -97,7 +95,7 @@ public class TestComponent { @Test public void testContainerCompletedWhenUpgrading() throws Exception { - String serviceName = "testContainerComplete"; + String serviceName = "testContainerCompletedWhenUpgrading"; MockRunningServiceContext context = createTestContext(rule, serviceName); Component comp = context.scheduler.getAllComponents().entrySet().iterator() .next().getValue(); @@ -105,48 +103,233 @@ public class TestComponent { comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE) .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1", "val1")).setUpgradeVersion("v2")); - comp.getAllComponentInstances().forEach(instance -> { - instance.handle(new ComponentInstanceEvent( - instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE)); - }); - Iterator<ComponentInstance> instanceIter = comp. - getAllComponentInstances().iterator(); + comp.getAllComponentInstances().forEach(instance -> + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE))); // reinitialization of a container failed - ContainerStatus status = mock(ContainerStatus.class); - when(status.getExitStatus()).thenReturn(ContainerExitStatus.ABORTED); - ComponentInstance instance = instanceIter.next(); + for(ComponentInstance instance : comp.getAllComponentInstances()) { + ComponentEvent stopEvent = new ComponentEvent(comp.getName(), + ComponentEventType.CONTAINER_COMPLETED) + .setInstance(instance) + .setContainerId(instance.getContainer().getId()); + comp.handle(stopEvent); + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), STOP)); + } + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in needs upgrade state", + ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); + } + + @Test + public void testCancelUpgrade() throws Exception { + ServiceContext context = createTestContext(rule, "testCancelUpgrade"); + Component comp = context.scheduler.getAllComponents().entrySet().iterator() + .next().getValue(); + + ComponentEvent upgradeEvent = new ComponentEvent(comp.getName(), + ComponentEventType.CANCEL_UPGRADE); + comp.handle(upgradeEvent); + Assert.assertEquals("component not in need upgrade state", + ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); + + Assert.assertEquals( + org.apache.hadoop.yarn.service.component.ComponentState + .CANCEL_UPGRADING, comp.getState()); + } + + @Test + public void testContainerCompletedCancelUpgrade() throws Exception { + String serviceName = "testContainerCompletedCancelUpgrade"; + MockRunningServiceContext context = createTestContext(rule, serviceName); + Component comp = context.scheduler.getAllComponents().entrySet().iterator() + .next().getValue(); + + // upgrade completes + comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE) + .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1", + "val1")).setUpgradeVersion("v2")); + comp.getAllComponentInstances().forEach(instance -> + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.UPGRADE))); + + // reinitialization of a container done + for(ComponentInstance instance : comp.getAllComponentInstances()) { + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), BECOME_READY)); + } + + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CANCEL_UPGRADE) + .setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1", + "val0")).setUpgradeVersion("v1")); + comp.getAllComponentInstances().forEach(instance -> + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.CANCEL_UPGRADE))); + + Iterator<ComponentInstance> iter = comp.getAllComponentInstances() + .iterator(); + + // cancel upgrade failed of a container + ComponentInstance instance1 = iter.next(); ComponentEvent stopEvent = new ComponentEvent(comp.getName(), ComponentEventType.CONTAINER_COMPLETED) - .setInstance(instance).setContainerId(instance.getContainer().getId()) - .setStatus(status); + .setInstance(instance1) + .setContainerId(instance1.getContainer().getId()); comp.handle(stopEvent); - instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), - STOP).setStatus(status)); + instance1.handle(new ComponentInstanceEvent( + instance1.getContainer().getId(), STOP)); + Assert.assertEquals( + org.apache.hadoop.yarn.service.component.ComponentState + .CANCEL_UPGRADING, comp.getState()); comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.CHECK_STABLE)); - Assert.assertEquals("component not in flexing state", - ComponentState.FLEXING, comp.getComponentSpec().getState()); - - // new container get allocated - context.assignNewContainer(context.attemptId, 10, comp); + Assert.assertEquals("component not in needs upgrade state", + ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); + Assert.assertEquals( + org.apache.hadoop.yarn.service.component.ComponentState + .CANCEL_UPGRADING, comp.getState()); // second instance finished upgrading - ComponentInstance instance2 = instanceIter.next(); + ComponentInstance instance2 = iter.next(); instance2.handle(new ComponentInstanceEvent( instance2.getContainer().getId(), ComponentInstanceEventType.BECOME_READY)); + comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.CHECK_STABLE)); + Assert.assertEquals("component not in flexing state", + ComponentState.FLEXING, comp.getComponentSpec().getState()); + // new container get allocated + context.assignNewContainer(context.attemptId, 10, comp); + + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + Assert.assertEquals("component not in stable state", ComponentState.STABLE, comp.getComponentSpec().getState()); - Assert.assertEquals("component did not upgrade successfully", "val1", + Assert.assertEquals("cancel upgrade failed", "val0", + comp.getComponentSpec().getConfiguration().getEnv("key1")); + } + + @Test + public void testCancelUpgradeSuccessWhileUpgrading() throws Exception { + String serviceName = "testCancelUpgradeWhileUpgrading"; + MockRunningServiceContext context = createTestContext(rule, serviceName); + Component comp = context.scheduler.getAllComponents().entrySet().iterator() + .next().getValue(); + cancelUpgradeWhileUpgrading(context, comp); + + // cancel upgrade successful for both instances + for(ComponentInstance instance : comp.getAllComponentInstances()) { + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.BECOME_READY)); + } + + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in stable state", + ComponentState.STABLE, comp.getComponentSpec().getState()); + Assert.assertEquals("cancel upgrade failed", "val0", + comp.getComponentSpec().getConfiguration().getEnv("key1")); + } + + @Test + public void testCancelUpgradeFailureWhileUpgrading() throws Exception { + String serviceName = "testCancelUpgradeFailureWhileUpgrading"; + MockRunningServiceContext context = createTestContext(rule, serviceName); + Component comp = context.scheduler.getAllComponents().entrySet().iterator() + .next().getValue(); + cancelUpgradeWhileUpgrading(context, comp); + + // cancel upgrade failed for both instances + for(ComponentInstance instance : comp.getAllComponentInstances()) { + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.STOP)); + } + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in flexing state", + ComponentState.FLEXING, comp.getComponentSpec().getState()); + + for (ComponentInstance instance : comp.getAllComponentInstances()) { + // new container get allocated + context.assignNewContainer(context.attemptId, 10, comp); + } + + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in stable state", + ComponentState.STABLE, comp.getComponentSpec().getState()); + Assert.assertEquals("cancel upgrade failed", "val0", comp.getComponentSpec().getConfiguration().getEnv("key1")); } + private void cancelUpgradeWhileUpgrading( + MockRunningServiceContext context, Component comp) + throws Exception { + + comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE) + .setTargetSpec(createSpecWithEnv(context.service.getName(), + comp.getName(), "key1", "val1")).setUpgradeVersion("v0")); + + Iterator<ComponentInstance> iter = comp.getAllComponentInstances() + .iterator(); + + ComponentInstance instance1 = iter.next(); + + // instance1 is triggered to upgrade + instance1.handle(new ComponentInstanceEvent( + instance1.getContainer().getId(), ComponentInstanceEventType.UPGRADE)); + + // component upgrade is cancelled + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CANCEL_UPGRADE) + .setTargetSpec(createSpecWithEnv(context.service.getName(), + comp.getName(), "key1", + "val0")).setUpgradeVersion("v0")); + + // all instances upgrade is cancelled. + comp.getAllComponentInstances().forEach(instance -> + instance.handle(new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.CANCEL_UPGRADE))); + + // regular upgrade failed for instance 1 + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CONTAINER_COMPLETED).setInstance(instance1) + .setContainerId(instance1.getContainer().getId())); + instance1.handle(new ComponentInstanceEvent( + instance1.getContainer().getId(), STOP)); + + // component should be in cancel upgrade + Assert.assertEquals( + org.apache.hadoop.yarn.service.component.ComponentState + .CANCEL_UPGRADING, comp.getState()); + + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CHECK_STABLE)); + + Assert.assertEquals("component not in needs upgrade state", + ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState()); + Assert.assertEquals( + org.apache.hadoop.yarn.service.component.ComponentState + .CANCEL_UPGRADING, comp.getState()); + } + @Test public void testComponentStateReachesStableStateWithTerminatingComponents() throws @@ -249,8 +432,6 @@ public class TestComponent { serviceState); } - - private static org.apache.hadoop.yarn.service.api.records.Component createSpecWithEnv(String serviceName, String compName, String key, String val) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java index e039981..c5a9631 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/instance/TestComponentInstance.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.service.component.instance; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -86,7 +87,7 @@ public class TestComponentInstance { @Test public void testContainerReadyAfterUpgrade() throws Exception { ServiceContext context = TestComponent.createTestContext(rule, - "testContainerStarted"); + "testContainerReadyAfterUpgrade"); Component component = context.scheduler.getAllComponents().entrySet() .iterator().next().getValue(); upgradeComponent(component); @@ -105,12 +106,186 @@ public class TestComponentInstance { .getId().toString()).getState()); } + + @Test + public void testContainerUpgradeFailed() throws Exception { + ServiceContext context = TestComponent.createTestContext(rule, + "testContainerUpgradeFailed"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + upgradeComponent(component); + + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); + + ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); + instance.handle(upgradeEvent); + + ContainerStatus containerStatus = mock(ContainerStatus.class); + when(containerStatus.getExitStatus()).thenReturn( + ContainerExitStatus.ABORTED); + ComponentInstanceEvent stopEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.STOP) + .setStatus(containerStatus); + // this is the call back from NM for the upgrade + instance.handle(stopEvent); + Assert.assertEquals("instance did not fail", ContainerState.FAILED_UPGRADE, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + } + + @Test + public void testCancelNothingToUpgrade() throws Exception { + ServiceContext context = TestComponent.createTestContext(rule, + "testCancelUpgradeWhenContainerReady"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + cancelCompUpgrade(component); + + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); + + ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.CANCEL_UPGRADE); + instance.handle(cancelEvent); + + Assert.assertEquals("instance not ready", ContainerState.READY, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + } + + @Test + public void testCancelUpgradeFailed() throws Exception { + ServiceContext context = TestComponent.createTestContext(rule, + "testCancelUpgradeFailed"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + cancelCompUpgrade(component); + + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); + + ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.CANCEL_UPGRADE); + instance.handle(cancelEvent); + + instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(), + ComponentInstanceEventType.STOP)); + Assert.assertEquals("instance not init", ComponentInstanceState.INIT, + instance.getState()); + } + + @Test + public void testCancelAfterCompProcessedCancel() throws Exception { + ServiceContext context = TestComponent.createTestContext(rule, + "testCancelAfterCompProcessedCancel"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + upgradeComponent(component); + cancelCompUpgrade(component); + + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); + ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); + instance.handle(upgradeEvent); + + Assert.assertEquals("instance should start upgrading", + ContainerState.NEEDS_UPGRADE, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + } + + @Test + public void testCancelWhileUpgradeWithSuccess() throws Exception { + validateCancelWhileUpgrading(true, true); + } + + @Test + public void testCancelWhileUpgradeWithFailure() throws Exception { + validateCancelWhileUpgrading(false, true); + } + + @Test + public void testCancelFailedWhileUpgradeWithSuccess() throws Exception { + validateCancelWhileUpgrading(true, false); + } + + @Test + public void testCancelFailedWhileUpgradeWithFailure() throws Exception { + validateCancelWhileUpgrading(false, false); + } + + private void validateCancelWhileUpgrading(boolean upgradeSuccessful, + boolean cancelUpgradeSuccessful) + throws Exception { + ServiceContext context = TestComponent.createTestContext(rule, + "testCancelWhileUpgrading"); + Component component = context.scheduler.getAllComponents().entrySet() + .iterator().next().getValue(); + upgradeComponent(component); + + ComponentInstance instance = component.getAllComponentInstances().iterator() + .next(); + ComponentInstanceEvent upgradeEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE); + instance.handle(upgradeEvent); + + Assert.assertEquals("instance should be upgrading", + ContainerState.UPGRADING, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + + cancelCompUpgrade(component); + ComponentInstanceEvent cancelEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), + ComponentInstanceEventType.CANCEL_UPGRADE); + instance.handle(cancelEvent); + + // either upgrade failed or successful + ComponentInstanceEvent readyOrStopEvent = new ComponentInstanceEvent( + instance.getContainer().getId(), + upgradeSuccessful ? ComponentInstanceEventType.BECOME_READY : + ComponentInstanceEventType.STOP); + + instance.handle(readyOrStopEvent); + Assert.assertEquals("instance not upgrading", ContainerState.UPGRADING, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + + // response for cancel received + ComponentInstanceEvent readyOrStopCancel = new ComponentInstanceEvent( + instance.getContainer().getId(), + cancelUpgradeSuccessful ? ComponentInstanceEventType.BECOME_READY : + ComponentInstanceEventType.STOP); + + instance.handle(readyOrStopCancel); + if (cancelUpgradeSuccessful) { + Assert.assertEquals("instance not ready", ContainerState.READY, + component.getComponentSpec().getContainer(instance.getContainer() + .getId().toString()).getState()); + } else { + Assert.assertEquals("instance not init", ComponentInstanceState.INIT, + instance.getState()); + } + } + private void upgradeComponent(Component component) { component.handle(new ComponentEvent(component.getName(), ComponentEventType.UPGRADE).setTargetSpec(component.getComponentSpec()) .setUpgradeVersion("v2")); } + private void cancelCompUpgrade(Component component) { + component.handle(new ComponentEvent(component.getName(), + ComponentEventType.CANCEL_UPGRADE) + .setTargetSpec(component.getComponentSpec()) + .setUpgradeVersion("v1")); + } + private Component createComponent(ServiceScheduler scheduler, org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum restartPolicy, int nSucceededInstances, int nFailedInstances, http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties new file mode 100644 index 0000000..81a3f6a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/resources/log4j.properties @@ -0,0 +1,19 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# log4j configuration used during build and unit tests + +log4j.rootLogger=info,stdout +log4j.threshold=ALL +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java index a0e4e02..b0e12bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java @@ -102,6 +102,7 @@ public class ApplicationCLI extends YarnCLI { public static final String ENABLE_FAST_LAUNCH = "enableFastLaunch"; public static final String UPGRADE_CMD = "upgrade"; public static final String UPGRADE_EXPRESS = "express"; + public static final String UPGRADE_CANCEL = "cancel"; public static final String UPGRADE_INITIATE = "initiate"; public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize"; public static final String UPGRADE_FINALIZE = "finalize"; @@ -265,6 +266,8 @@ public class ApplicationCLI extends YarnCLI { opts.addOption(UPGRADE_AUTO_FINALIZE, false, "Works with -upgrade and " + "-initiate options to initiate the upgrade of the application with " + "the ability to finalize the upgrade automatically."); + opts.addOption(UPGRADE_CANCEL, false, "Works with -upgrade option to " + + "cancel current upgrade."); opts.getOption(LAUNCH_CMD).setArgName("Application Name> <File Name"); opts.getOption(LAUNCH_CMD).setArgs(2); opts.getOption(START_CMD).setArgName("Application Name"); @@ -646,7 +649,7 @@ public class ApplicationCLI extends YarnCLI { } else if (cliParser.hasOption(UPGRADE_CMD)) { if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_EXPRESS, UPGRADE_INITIATE, UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, - COMPONENT_INSTS, COMPONENTS, APP_TYPE_CMD)) { + UPGRADE_CANCEL, COMPONENT_INSTS, COMPONENTS, APP_TYPE_CMD)) { printUsage(title, opts); return exitCode; } @@ -697,6 +700,13 @@ public class ApplicationCLI extends YarnCLI { return exitCode; } return client.actionStart(appName); + } else if (cliParser.hasOption(UPGRADE_CANCEL)) { + if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, + UPGRADE_CANCEL, APP_TYPE_CMD)) { + printUsage(title, opts); + return exitCode; + } + return client.actionCancelUpgrade(appName); } } else { syserr.println("Invalid Command Usage : "); http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java index a600895..f795db5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java @@ -2143,6 +2143,8 @@ public class TestYarnCLI { pw.println(" the upgrade of the application"); pw.println(" with the ability to finalize the"); pw.println(" upgrade automatically."); + pw.println(" -cancel Works with -upgrade option to"); + pw.println(" cancel current upgrade."); pw.println(" -changeQueue <Queue Name> Moves application to a new"); pw.println(" queue. ApplicationId can be"); pw.println(" passed using 'appId' option."); http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java index 232666d..df11ffd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java @@ -300,4 +300,17 @@ public abstract class AppAdminClient extends CompositeService { @Unstable public abstract int actionUpgradeExpress(String appName, File fileName) throws IOException, YarnException; + + /** + * Cancels the upgrade of the service. + * + * @param appName the name of the application + * @return exit code + * @throws IOException + * @throws YarnException + */ + @Public + @Unstable + public abstract int actionCancelUpgrade(String appName) throws IOException, + YarnException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/913f87da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 27a7c80..01d70af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -1836,6 +1836,7 @@ public class ContainerManagerImpl extends CompositeService implements public void reInitializeContainer(ContainerId containerId, ContainerLaunchContext reInitLaunchContext, boolean autoCommit) throws YarnException { + LOG.debug("{} requested reinit", containerId); Container container = preReInitializeOrLocalizeCheck(containerId, ReInitOp.RE_INIT); ResourceSet resourceSet = new ResourceSet(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org