YARN-8080. Add restart policy for YARN services. Contributed by Suma Shivaprasad
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7f083ed8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7f083ed8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7f083ed8 Branch: refs/heads/trunk Commit: 7f083ed8699a720d3fb82e4ec310356902a6ac30 Parents: 7802af6 Author: Eric Yang <ey...@apache.org> Authored: Thu May 17 17:16:50 2018 -0400 Committer: Eric Yang <ey...@apache.org> Committed: Thu May 17 17:16:50 2018 -0400 ---------------------------------------------------------------------- ...RN-Simplified-V1-API-Layer-For-Services.yaml | 8 + .../hadoop/yarn/service/ClientAMService.java | 7 +- .../hadoop/yarn/service/ServiceContext.java | 4 + .../hadoop/yarn/service/ServiceManager.java | 25 +- .../hadoop/yarn/service/ServiceScheduler.java | 73 ++- .../yarn/service/api/records/Component.java | 71 +++ .../service/component/AlwaysRestartPolicy.java | 82 ++++ .../yarn/service/component/Component.java | 209 ++++++-- .../component/ComponentRestartPolicy.java | 45 ++ .../service/component/NeverRestartPolicy.java | 82 ++++ .../component/OnFailureRestartPolicy.java | 87 ++++ .../component/instance/ComponentInstance.java | 91 +++- .../hadoop/yarn/service/utils/ServiceUtils.java | 18 + .../hadoop/yarn/service/ServiceTestUtils.java | 46 +- .../hadoop/yarn/service/TestServiceManager.java | 6 +- .../yarn/service/component/TestComponent.java | 99 +++- .../component/TestComponentRestartPolicy.java | 130 +++++ .../instance/TestComponentInstance.java | 484 ++++++++++++++++++- .../markdown/yarn-service/YarnServiceAPI.md | 2 + 19 files changed, 1447 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml index cea8296..d90ae06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml @@ -424,6 +424,14 @@ definitions: items: type: string description: A list of quicklink keys defined at the service level, and to be resolved by this component. + restartPolicy: + type: string + description: Policy of restart component. Including ALWAYS (Always restart component even if instance exit code = 0); ON_FAILURE (Only restart component if instance exit code != 0); NEVER (Do not restart in any cases) + enum: + - ALWAYS + - ON_FAILURE + - NEVER + default: ALWAYS ReadinessCheck: description: A check to be performed to determine the readiness of a component instance (a container). If no readiness check is specified, the default readiness check will be used unless the yarn.service.default-readiness-check.enabled configuration property is set to false at the component, service, or system level. The artifact field is currently unsupported but may be implemented in the future, enabling a pluggable helper container to support advanced use cases. required: http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index d5d6fa4..e97c3d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -27,6 +27,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto; @@ -130,7 +131,7 @@ public class ClientAMService extends AbstractService LOG.info("Stop the service by {}", UserGroupInformation.getCurrentUser()); context.scheduler.getDiagnostics() .append("Stopped by user " + UserGroupInformation.getCurrentUser()); - context.scheduler.setGracefulStop(); + context.scheduler.setGracefulStop(FinalApplicationStatus.ENDED); // Stop the service in 2 seconds delay to make sure this rpc call is completed. // shutdown hook will be executed which will stop AM gracefully. @@ -157,10 +158,10 @@ public class ClientAMService extends AbstractService public UpgradeServiceResponseProto upgrade( UpgradeServiceRequestProto request) throws IOException { try { - context.getServiceManager().processUpgradeRequest(request.getVersion(), - request.getAutoFinalize()); LOG.info("Upgrading service to version {} by {}", request.getVersion(), UserGroupInformation.getCurrentUser()); + context.getServiceManager().processUpgradeRequest(request.getVersion(), + request.getAutoFinalize()); return UpgradeServiceResponseProto.newBuilder().build(); } catch (Exception ex) { return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage()) http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java index 6c91b9c..8779153 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java @@ -56,4 +56,8 @@ public class ServiceContext { void setServiceManager(ServiceManager serviceManager) { this.serviceManager = Preconditions.checkNotNull(serviceManager); } + + public Service getService() { + return service; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java index e6a38dc..05ecb3f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; +import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; @@ -266,12 +267,24 @@ public class ServiceManager implements EventHandler<ServiceEvent> { event.setAutoFinalize(true); } compsThatNeedUpgrade.forEach(component -> { - ComponentEvent needUpgradeEvent = new ComponentEvent( - component.getName(), ComponentEventType.UPGRADE) - .setTargetSpec(component) - .setUpgradeVersion(event.getVersion()); - context.scheduler.getDispatcher().getEventHandler().handle( - needUpgradeEvent); + org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum + restartPolicy = component.getRestartPolicy(); + + final ComponentRestartPolicy restartPolicyHandler = + Component.getRestartPolicyHandler(restartPolicy); + // Do not allow upgrades for components which have NEVER/ON_FAILURE + // restart policy + if (restartPolicyHandler.allowUpgrades()) { + ComponentEvent needUpgradeEvent = new ComponentEvent( + component.getName(), ComponentEventType.UPGRADE).setTargetSpec( + component).setUpgradeVersion(event.getVersion()); + context.scheduler.getDispatcher().getEventHandler().handle( + needUpgradeEvent); + } else { + LOG.info("The component {} has a restart " + + "policy that doesnt allow upgrades {} ", component.getName(), + component.getRestartPolicy().toString()); + } }); } else { // nothing to upgrade if upgrade auto finalize is requested, trigger a http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index ee0a1a7..d3e8e4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.service.api.ServiceApiConstants; import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.api.records.ConfigFile; +import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; @@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.service.timelineservice.ServiceMetricsSink; import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils; +import org.apache.hadoop.yarn.service.utils.ServiceUtils; import org.apache.hadoop.yarn.util.BoundedAppender; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; @@ -89,8 +91,10 @@ import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -101,6 +105,10 @@ import static org.apache.hadoop.registry.client.api.RegistryConstants.*; import static org.apache.hadoop.yarn.api.records.ContainerExitStatus.KILLED_AFTER_APP_COMPLETION; import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*; import static org.apache.hadoop.yarn.service.component.ComponentEventType.*; +import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes + .EXIT_FALSE; +import static org.apache.hadoop.yarn.service.exceptions.LauncherExitCodes + .EXIT_SUCCESS; /** * @@ -158,8 +166,15 @@ public class ServiceScheduler extends CompositeService { private boolean gracefulStop = false; + private volatile FinalApplicationStatus finalApplicationStatus = + FinalApplicationStatus.ENDED; + + // For unit test override since we don't want to terminate UT process. + private ServiceUtils.ProcessTerminationHandler + terminationHandler = new ServiceUtils.ProcessTerminationHandler(); + public ServiceScheduler(ServiceContext context) { - super(context.service.getName()); + super(context.getService().getName()); this.context = context; } @@ -256,8 +271,9 @@ public class ServiceScheduler extends CompositeService { .createAMRMClientAsync(1000, new AMRMClientCallback()); } - protected void setGracefulStop() { + public void setGracefulStop(FinalApplicationStatus applicationStatus) { this.gracefulStop = true; + this.finalApplicationStatus = applicationStatus; nmClient.getClient().cleanupRunningContainersOnStop(true); } @@ -877,4 +893,57 @@ public class ServiceScheduler extends CompositeService { public boolean hasAtLeastOnePlacementConstraint() { return hasAtLeastOnePlacementConstraint; } + + /* +* Check if all components of the scheduler finished. +* If all components finished +* (which #failed-instances + #suceeded-instances = #total-n-containers) +* The service will be terminated. +*/ + public synchronized void terminateServiceIfAllComponentsFinished() { + boolean shouldTerminate = true; + + // Succeeded comps and failed comps, for logging purposes. + Set<String> succeededComponents = new HashSet<>(); + Set<String> failedComponents = new HashSet<>(); + + for (Component comp : getAllComponents().values()) { + ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler(); + if (!restartPolicy.shouldTerminate(comp)) { + shouldTerminate = false; + break; + } + + long nFailed = comp.getNumFailedInstances(); + + if (nFailed > 0) { + failedComponents.add(comp.getName()); + } else{ + succeededComponents.add(comp.getName()); + } + } + + if (shouldTerminate) { + LOG.info("All component finished, exiting Service Master... " + + ", final status=" + (failedComponents.isEmpty() ? + "Succeeded" : + "Failed")); + LOG.info("Succeeded components: [" + org.apache.commons.lang3.StringUtils + .join(succeededComponents, ",") + "]"); + LOG.info("Failed components: [" + org.apache.commons.lang3.StringUtils + .join(failedComponents, ",") + "]"); + + if (failedComponents.isEmpty()) { + setGracefulStop(FinalApplicationStatus.SUCCEEDED); + getTerminationHandler().terminate(EXIT_SUCCESS); + } else{ + setGracefulStop(FinalApplicationStatus.FAILED); + getTerminationHandler().terminate(EXIT_FALSE); + } + } + } + + public ServiceUtils.ProcessTerminationHandler getTerminationHandler() { + return terminationHandler; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java index 7deb076..0481123 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Component.java @@ -17,6 +17,7 @@ package org.apache.hadoop.yarn.service.api.records; +import com.fasterxml.jackson.annotation.JsonValue; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -29,7 +30,9 @@ import java.util.Objects; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlEnum; import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlType; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; @@ -98,6 +101,74 @@ public class Component implements Serializable { private List<Container> containers = Collections.synchronizedList(new ArrayList<Container>()); + + @JsonProperty("restart_policy") + @XmlElement(name = "restart_policy") + private RestartPolicyEnum restartPolicy = RestartPolicyEnum.ALWAYS; + + /** + * Policy of restart component. Including ALWAYS - Long lived components + * (Always restart component instance even if instance exit code = 0.); + * + * ON_FAILURE (Only restart component instance if instance exit code != + * 0); + * NEVER (Do not restart in any cases) + * + * @return restartPolicy + **/ + @XmlType(name = "restart_policy") + @XmlEnum + public enum RestartPolicyEnum { + ALWAYS("ALWAYS"), + + ON_FAILURE("ON_FAILURE"), + + NEVER("NEVER"); + private String value; + + RestartPolicyEnum(String value) { + this.value = value; + } + + @Override + @JsonValue + public String toString() { + return value; + } + } + + public Component restartPolicy(RestartPolicyEnum restartPolicyEnumVal) { + this.restartPolicy = restartPolicyEnumVal; + return this; + } + + /** + * Policy of restart component. + * + * Including + * ALWAYS (Always restart component instance even if instance exit + * code = 0); + * + * ON_FAILURE (Only restart component instance if instance exit code != + * 0); + * + * NEVER (Do not restart in any cases) + * + * @return restartPolicy + **/ + @ApiModelProperty(value = "Policy of restart component. Including ALWAYS " + + "(Always restart component even if instance exit code = 0); " + + "ON_FAILURE (Only restart component if instance exit code != 0); " + + "NEVER (Do not restart in any cases)") + public RestartPolicyEnum getRestartPolicy() { + return restartPolicy; + } + + public void setRestartPolicy(RestartPolicyEnum restartPolicy) { + this.restartPolicy = restartPolicy; + } + + /** * Name of the service component (mandatory). **/ http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/AlwaysRestartPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/AlwaysRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/AlwaysRestartPolicy.java new file mode 100644 index 0000000..704ab14 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/AlwaysRestartPolicy.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; + +/** + * Always restart policy allows for restarts for long live components which + * never terminate. + */ +public final class AlwaysRestartPolicy implements ComponentRestartPolicy { + + private static AlwaysRestartPolicy INSTANCE = new AlwaysRestartPolicy(); + + private AlwaysRestartPolicy() { + } + + public static AlwaysRestartPolicy getInstance() { + return INSTANCE; + } + + @Override public boolean isLongLived() { + return true; + } + + /** + * This is always false since these components never terminate + * + * @param component + * @return + */ + @Override public boolean hasCompleted(Component component) { + return false; + } + + /** + * This is always false since these components never terminate + * + * @param component + * @return + */ + @Override public boolean hasCompletedSuccessfully(Component component) { + return false; + } + + @Override public boolean shouldRelaunchInstance( + ComponentInstance componentInstance, ContainerStatus containerStatus) { + return true; + } + + @Override public boolean isReadyForDownStream(Component dependentComponent) { + if (dependentComponent.getNumReadyInstances() < dependentComponent + .getNumDesiredInstances()) { + return false; + } + return true; + } + + @Override public boolean allowUpgrades() { + return true; + } + + @Override public boolean shouldTerminate(Component component) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 7979c19..931877e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -18,9 +18,12 @@ package org.apache.hadoop.yarn.service.component; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; +import static org.apache.hadoop.yarn.service.api.records.Component + .RestartPolicyEnum; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -111,6 +114,13 @@ public class Component implements EventHandler<ComponentEvent> { // The number of containers failed since last reset. This excludes preempted, // disk_failed containers etc. This will be reset to 0 periodically. public AtomicInteger currentContainerFailure = new AtomicInteger(0); + + //succeeded and Failed instances are Populated only for RestartPolicyEnum + //.ON_FAILURE/NEVER + private Map<String, ComponentInstance> succeededInstances = + new ConcurrentHashMap<>(); + private Map<String, ComponentInstance> failedInstances = + new ConcurrentHashMap<>(); private boolean healthThresholdMonitorEnabled = false; private AtomicBoolean upgradeInProgress = new AtomicBoolean(false); @@ -297,7 +307,7 @@ public class Component implements EventHandler<ComponentEvent> { @Override public ComponentState transition(Component component, ComponentEvent event) { - component.setDesiredContainers((int)event.getDesired()); + component.setDesiredContainers((int) event.getDesired()); if (!component.areDependenciesReady()) { LOG.info("[FLEX COMPONENT {}]: Flex deferred because dependencies not" + " satisfied.", component.getName()); @@ -402,11 +412,37 @@ public class Component implements EventHandler<ComponentEvent> { } } - private static ComponentState checkIfStable(Component component) { + @VisibleForTesting + static ComponentState checkIfStable(Component component) { + if (component.getRestartPolicyHandler().isLongLived()) { + return updateStateForLongRunningComponents(component); + } else{ + //NEVER/ON_FAILURE + return updateStateForTerminatingComponents(component); + } + } + + private static ComponentState updateStateForTerminatingComponents( + Component component) { + if (component.getNumRunningInstances() + component + .getNumSucceededInstances() + component.getNumFailedInstances() + < component.getComponentSpec().getNumberOfContainers()) { + component.componentSpec.setState( + org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); + return FLEXING; + } else{ + component.componentSpec.setState( + org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); + return STABLE; + } + } + + private static ComponentState updateStateForLongRunningComponents( + Component component) { // if desired == running if (component.componentMetrics.containersReady.value() == component - .getComponentSpec().getNumberOfContainers() && - component.numContainersThatNeedUpgrade.get() == 0) { + .getComponentSpec().getNumberOfContainers() + && component.numContainersThatNeedUpgrade.get() == 0) { component.componentSpec.setState( org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); return STABLE; @@ -425,17 +461,41 @@ public class Component implements EventHandler<ComponentEvent> { // This method should be called whenever there is an increment or decrement // of a READY state container of a component - public static synchronized void checkAndUpdateComponentState( + //This should not matter for terminating components + private static synchronized void checkAndUpdateComponentState( Component component, boolean isIncrement) { org.apache.hadoop.yarn.service.api.records.ComponentState curState = component.componentSpec.getState(); - if (isIncrement) { - // check if all containers are in READY state - if (component.numContainersThatNeedUpgrade.get() == 0 && - component.componentMetrics.containersReady.value() == - component.componentMetrics.containersDesired.value()) { - component.componentSpec.setState( - org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); + + if (component.getRestartPolicyHandler().isLongLived()) { + if (isIncrement) { + // check if all containers are in READY state + if (component.numContainersThatNeedUpgrade.get() == 0 + && component.componentMetrics.containersReady.value() + == component.componentMetrics.containersDesired.value()) { + component.componentSpec.setState( + org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); + if (curState != component.componentSpec.getState()) { + LOG.info("[COMPONENT {}] state changed from {} -> {}", + component.componentSpec.getName(), curState, + component.componentSpec.getState()); + } + // component state change will trigger re-check of service state + component.context.getServiceManager().checkAndUpdateServiceState(); + } + } else{ + // container moving out of READY state could be because of FLEX down so + // still need to verify the count before changing the component state + if (component.componentMetrics.containersReady.value() + < component.componentMetrics.containersDesired.value()) { + component.componentSpec.setState( + org.apache.hadoop.yarn.service.api.records.ComponentState + .FLEXING); + } else if (component.componentMetrics.containersReady.value() + == component.componentMetrics.containersDesired.value()) { + component.componentSpec.setState( + org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); + } if (curState != component.componentSpec.getState()) { LOG.info("[COMPONENT {}] state changed from {} -> {}", component.componentSpec.getName(), curState, @@ -445,44 +505,38 @@ public class Component implements EventHandler<ComponentEvent> { component.context.getServiceManager().checkAndUpdateServiceState(); } } else { - // container moving out of READY state could be because of FLEX down so - // still need to verify the count before changing the component state - if (component.componentMetrics.containersReady - .value() < component.componentMetrics.containersDesired.value()) { - component.componentSpec.setState( - org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); - } else if (component.componentMetrics.containersReady - .value() == component.componentMetrics.containersDesired.value()) { - component.componentSpec.setState( - org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE); - } - if (curState != component.componentSpec.getState()) { - LOG.info("[COMPONENT {}] state changed from {} -> {}", - component.componentSpec.getName(), curState, - component.componentSpec.getState()); - } // component state change will trigger re-check of service state component.context.getServiceManager().checkAndUpdateServiceState(); } // when the service is stable then the state of component needs to // transition to stable - component.dispatcher.getEventHandler().handle(new ComponentEvent( - component.getName(), ComponentEventType.CHECK_STABLE)); + component.dispatcher.getEventHandler().handle( + new ComponentEvent(component.getName(), + ComponentEventType.CHECK_STABLE)); } private static class ContainerCompletedTransition extends BaseTransition { @Override public void transition(Component component, ComponentEvent event) { + component.updateMetrics(event.getStatus()); component.dispatcher.getEventHandler().handle( - new ComponentInstanceEvent(event.getStatus().getContainerId(), - STOP).setStatus(event.getStatus())); - component.componentSpec.setState( - org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); - if (component.context.service.getState().equals(ServiceState.STABLE)) { - component.getScheduler().getApp().setState(ServiceState.STARTED); - LOG.info("Service def state changed from {} -> {}", - ServiceState.STABLE, ServiceState.STARTED); + new ComponentInstanceEvent(event.getStatus().getContainerId(), STOP) + .setStatus(event.getStatus())); + + ComponentRestartPolicy restartPolicy = + component.getRestartPolicyHandler(); + + if (restartPolicy.shouldRelaunchInstance(event.getInstance(), + event.getStatus())) { + component.componentSpec.setState( + org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING); + + if (component.context.service.getState().equals(ServiceState.STABLE)) { + component.getScheduler().getApp().setState(ServiceState.STARTED); + LOG.info("Service def state changed from {} -> {}", + ServiceState.STABLE, ServiceState.STARTED); + } } } } @@ -725,8 +779,6 @@ public class Component implements EventHandler<ComponentEvent> { componentMetrics.containersDesired.set(n); } - - private void updateMetrics(ContainerStatus status) { switch (status.getExitStatus()) { case SUCCESS: @@ -753,7 +805,7 @@ public class Component implements EventHandler<ComponentEvent> { String host = scheduler.getLiveInstances().get(status.getContainerId()) .getNodeId().getHost(); failureTracker.incNodeFailure(host); - currentContainerFailure.getAndIncrement() ; + currentContainerFailure.getAndIncrement(); } } @@ -763,17 +815,18 @@ public class Component implements EventHandler<ComponentEvent> { return true; } for (String dependency : dependencies) { - Component dependentComponent = - scheduler.getAllComponents().get(dependency); + Component dependentComponent = scheduler.getAllComponents().get( + dependency); if (dependentComponent == null) { LOG.error("Couldn't find dependency {} for {} (should never happen)", dependency, getName()); continue; } - if (dependentComponent.getNumReadyInstances() < dependentComponent - .getNumDesiredInstances()) { + + if (!dependentComponent.isReadyForDownstream()) { LOG.info("[COMPONENT {}]: Dependency {} not satisfied, only {} of {}" - + " instances are ready.", getName(), dependency, + + " instances are ready or the dependent component has not " + + "completed ", getName(), dependency, dependentComponent.getNumReadyInstances(), dependentComponent.getNumDesiredInstances()); return false; @@ -782,6 +835,7 @@ public class Component implements EventHandler<ComponentEvent> { return true; } + public Map<String, String> getDependencyHostIpTokens() { Map<String, String> tokens = new HashMap<>(); List<String> dependencies = componentSpec.getDependencies(); @@ -955,4 +1009,67 @@ public class Component implements EventHandler<ComponentEvent> { boolean healthThresholdMonitorEnabled) { this.healthThresholdMonitorEnabled = healthThresholdMonitorEnabled; } + + public Collection<ComponentInstance> getSucceededInstances() { + return succeededInstances.values(); + } + + public long getNumSucceededInstances() { + return succeededInstances.size(); + } + + public long getNumFailedInstances() { + return failedInstances.size(); + } + + public Collection<ComponentInstance> getFailedInstances() { + return failedInstances.values(); + } + + public synchronized void markAsSucceeded(ComponentInstance instance) { + removeFailedInstanceIfExists(instance); + succeededInstances.put(instance.getCompInstanceName(), instance); + } + + public synchronized void markAsFailed(ComponentInstance instance) { + removeSuccessfulInstanceIfExists(instance); + failedInstances.put(instance.getCompInstanceName(), instance); + } + + public boolean removeFailedInstanceIfExists(ComponentInstance instance) { + if (failedInstances.containsKey(instance.getCompInstanceName())) { + failedInstances.remove(instance.getCompInstanceName()); + return true; + } + return false; + } + + public boolean removeSuccessfulInstanceIfExists(ComponentInstance instance) { + if (succeededInstances.containsKey(instance.getCompInstanceName())) { + succeededInstances.remove(instance.getCompInstanceName()); + return true; + } + return false; + } + + public boolean isReadyForDownstream() { + return getRestartPolicyHandler().isReadyForDownStream(this); + } + + public static ComponentRestartPolicy getRestartPolicyHandler( + RestartPolicyEnum restartPolicyEnum) { + + if (RestartPolicyEnum.NEVER == restartPolicyEnum) { + return NeverRestartPolicy.getInstance(); + } else if (RestartPolicyEnum.ON_FAILURE == restartPolicyEnum) { + return OnFailureRestartPolicy.getInstance(); + } else{ + return AlwaysRestartPolicy.getInstance(); + } + } + + public ComponentRestartPolicy getRestartPolicyHandler() { + RestartPolicyEnum restartPolicyEnum = getComponentSpec().getRestartPolicy(); + return getRestartPolicyHandler(restartPolicyEnum); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentRestartPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentRestartPolicy.java new file mode 100644 index 0000000..23b0fb9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentRestartPolicy.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; + +/** + * Interface for Component Restart policies. + * Which is used to make decisions on termination/restart of components and + * their instances. + */ +public interface ComponentRestartPolicy { + + boolean isLongLived(); + + boolean hasCompleted(Component component); + + boolean hasCompletedSuccessfully(Component component); + + boolean shouldRelaunchInstance(ComponentInstance componentInstance, + ContainerStatus containerStatus); + + boolean isReadyForDownStream(Component component); + + boolean allowUpgrades(); + + boolean shouldTerminate(Component component); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java new file mode 100644 index 0000000..ace1f89 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/NeverRestartPolicy.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; + +/** + * Policy for components with instances that do not require/support a restart. + */ +public final class NeverRestartPolicy implements ComponentRestartPolicy { + + private static NeverRestartPolicy INSTANCE = new NeverRestartPolicy(); + + private NeverRestartPolicy() { + } + + public static NeverRestartPolicy getInstance() { + return INSTANCE; + } + + @Override public boolean isLongLived() { + return false; + } + + @Override public boolean hasCompleted(Component component) { + if (component.getNumSucceededInstances() + component.getNumFailedInstances() + < component.getNumDesiredInstances()) { + return false; + } + return true; + } + + @Override public boolean hasCompletedSuccessfully(Component component) { + if (component.getNumSucceededInstances() == component + .getNumDesiredInstances()) { + return true; + } + return false; + } + + @Override public boolean shouldRelaunchInstance( + ComponentInstance componentInstance, ContainerStatus containerStatus) { + return false; + } + + @Override public boolean isReadyForDownStream(Component component) { + if (hasCompleted(component)) { + return true; + } + return false; + } + + @Override public boolean allowUpgrades() { + return false; + } + + @Override public boolean shouldTerminate(Component component) { + long nSucceeded = component.getNumSucceededInstances(); + long nFailed = component.getNumFailedInstances(); + if (nSucceeded + nFailed < component.getComponentSpec() + .getNumberOfContainers()) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java new file mode 100644 index 0000000..39fba2a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/OnFailureRestartPolicy.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; + +/** + * Policy for components that require restarts for instances on failure. + */ +public final class OnFailureRestartPolicy implements ComponentRestartPolicy { + + private static OnFailureRestartPolicy INSTANCE = new OnFailureRestartPolicy(); + + private OnFailureRestartPolicy() { + } + + public static OnFailureRestartPolicy getInstance() { + return INSTANCE; + } + + @Override public boolean isLongLived() { + return false; + } + + @Override public boolean hasCompleted(Component component) { + if (hasCompletedSuccessfully(component)) { + return true; + } + + return false; + } + + @Override public boolean hasCompletedSuccessfully(Component component) { + if (component.getNumSucceededInstances() == component + .getNumDesiredInstances()) { + return true; + } + + return false; + } + + @Override public boolean shouldRelaunchInstance( + ComponentInstance componentInstance, ContainerStatus containerStatus) { + + if (ComponentInstance.hasContainerFailed(containerStatus)) { + return true; + } + + return false; + } + + @Override public boolean isReadyForDownStream(Component component) { + if (hasCompletedSuccessfully(component)) { + return true; + } + + return false; + } + + @Override public boolean allowUpgrades() { + return false; + } + + @Override public boolean shouldTerminate(Component component) { + long nSucceeded = component.getNumSucceededInstances(); + if (nSucceeded < component.getComponentSpec().getNumberOfContainers()) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index a323649..529596d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service.component.instance; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.registry.client.api.RegistryConstants; @@ -25,9 +26,9 @@ import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; -import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.component.Component; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.component.ComponentEventType; +import org.apache.hadoop.yarn.service.component.ComponentRestartPolicy; import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher; @@ -96,8 +98,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, // This container object is used for rest API query private org.apache.hadoop.yarn.service.api.records.Container containerSpec; + private static final StateMachineFactory<ComponentInstance, - ComponentInstanceState, ComponentInstanceEventType, ComponentInstanceEvent> + ComponentInstanceState, ComponentInstanceEventType, + ComponentInstanceEvent> stateMachineFactory = new StateMachineFactory<ComponentInstance, ComponentInstanceState, ComponentInstanceEventType, ComponentInstanceEvent>(INIT) @@ -230,6 +234,47 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, } } + @VisibleForTesting + static void handleComponentInstanceRelaunch( + ComponentInstance compInstance, ComponentInstanceEvent event) { + Component comp = compInstance.getComponent(); + + // Do we need to relaunch the service? + boolean hasContainerFailed = hasContainerFailed(event.getStatus()); + + ComponentRestartPolicy restartPolicy = comp.getRestartPolicyHandler(); + + if (restartPolicy.shouldRelaunchInstance(compInstance, event.getStatus())) { + // re-ask the failed container. + comp.requestContainers(1); + comp.reInsertPendingInstance(compInstance); + LOG.info(compInstance.getCompInstanceId() + + ": {} completed. Reinsert back to pending list and requested " + + "a new container." + System.lineSeparator() + + " exitStatus={}, diagnostics={}.", + event.getContainerId(), event.getStatus().getExitStatus(), + event.getStatus().getDiagnostics()); + } else { + // When no relaunch, update component's #succeeded/#failed + // instances. + if (hasContainerFailed) { + comp.markAsFailed(compInstance); + } else { + comp.markAsSucceeded(compInstance); + } + LOG.info(compInstance.getCompInstanceId() + (!hasContainerFailed ? + " succeeded" : + " failed") + " without retry, exitStatus=" + event.getStatus()); + comp.getScheduler().terminateServiceIfAllComponentsFinished(); + } + } + + public static boolean hasContainerFailed(ContainerStatus containerStatus) { + //Mark conainer as failed if we cant get its exit status i.e null? + return containerStatus == null || containerStatus.getExitStatus() != + ContainerExitStatus.SUCCESS; + } + private static class ContainerStoppedTransition extends BaseTransition { // whether the container failed before launched by AM or not. boolean failedBeforeLaunching = false; @@ -244,9 +289,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, @Override public void transition(ComponentInstance compInstance, ComponentInstanceEvent event) { - // re-ask the failed container. + Component comp = compInstance.component; - comp.requestContainers(1); String containerDiag = compInstance.getCompInstanceId() + ": " + event.getStatus() .getDiagnostics(); @@ -259,7 +303,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, compInstance.component.decContainersReady(true); } compInstance.component.decRunningContainers(); - boolean shouldExit = false; + // Should we fail (terminate) the service? + boolean shouldFailService = false; + + final ServiceScheduler scheduler = comp.getScheduler(); // Check if it exceeds the failure threshold, but only if health threshold // monitor is not enabled if (!comp.isHealthThresholdMonitorEnabled() @@ -271,10 +318,10 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, comp.getName(), comp.currentContainerFailure.get(), comp.maxContainerFailurePerComp); compInstance.diagnostics.append(exitDiag); // append to global diagnostics that will be reported to RM. - comp.getScheduler().getDiagnostics().append(containerDiag); - comp.getScheduler().getDiagnostics().append(exitDiag); + scheduler.getDiagnostics().append(containerDiag); + scheduler.getDiagnostics().append(exitDiag); LOG.warn(exitDiag); - shouldExit = true; + shouldFailService = true; } if (!failedBeforeLaunching) { @@ -296,25 +343,14 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, } // remove the failed ContainerId -> CompInstance mapping - comp.getScheduler().removeLiveCompInstance(event.getContainerId()); + scheduler.removeLiveCompInstance(event.getContainerId()); - comp.reInsertPendingInstance(compInstance); + // According to component restart policy, handle container restart + // or finish the service (if all components finished) + handleComponentInstanceRelaunch(compInstance, event); - LOG.info(compInstance.getCompInstanceId() - + ": {} completed. Reinsert back to pending list and requested " + - "a new container." + System.lineSeparator() + - " exitStatus={}, diagnostics={}.", - event.getContainerId(), event.getStatus().getExitStatus(), - event.getStatus().getDiagnostics()); - if (shouldExit) { - // Sleep for 5 seconds in hope that the state can be recorded in ATS. - // in case there's a client polling the comp state, it can be notified. - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - LOG.error("Interrupted on sleep while exiting.", e); - } - ExitUtil.terminate(-1); + if (shouldFailService) { + scheduler.getTerminationHandler().terminate(-1); } } } @@ -630,4 +666,9 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, >>> 32)); return result; } + + @VisibleForTesting public org.apache.hadoop.yarn.service.api.records + .Container getContainerSpec() { + return containerSpec; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java index 915b836..707bbf0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.net.DNS; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; @@ -571,4 +572,21 @@ public final class ServiceUtils { // Fallback to querying the default hostname as we did before. return InetAddress.getLocalHost().getCanonicalHostName(); } + + /** + * Process termination handler - exist with specified exit code after + * waiting a while for ATS state to be in sync. + */ + public static class ProcessTerminationHandler { + public void terminate(int exitCode) { + // Sleep for 5 seconds in hope that the state can be recorded in ATS. + // in case there's a client polling the comp state, it can be notified. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + log.info("Interrupted on sleep while exiting.", e); + } + ExitUtil.terminate(exitCode); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 599b8a7..86b4cea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -57,6 +57,8 @@ import java.io.IOException; import java.io.OutputStream; import java.net.URL; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM; @@ -99,8 +101,32 @@ public class ServiceTestUtils { return exampleApp; } + // Example service definition + // 2 components, each of which has 2 containers. + public static Service createTerminatingJobExample(String serviceName) { + Service exampleApp = new Service(); + exampleApp.setName(serviceName); + exampleApp.setVersion("v1"); + exampleApp.addComponent( + createComponent("terminating-comp1", 2, "sleep " + "1000", + Component.RestartPolicyEnum.NEVER, null)); + exampleApp.addComponent( + createComponent("terminating-comp2", 2, "sleep 1000", + Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{ + add("terminating-comp1"); + }})); + exampleApp.addComponent( + createComponent("terminating-comp3", 2, "sleep 1000", + Component.RestartPolicyEnum.ON_FAILURE, new ArrayList<String>() {{ + add("terminating-comp2"); + }})); + + return exampleApp; + } + public static Component createComponent(String name) { - return createComponent(name, 2L, "sleep 1000"); + return createComponent(name, 2L, "sleep 1000", + Component.RestartPolicyEnum.ALWAYS, null); } protected static Component createComponent(String name, long numContainers, @@ -116,6 +142,18 @@ public class ServiceTestUtils { return comp1; } + protected static Component createComponent(String name, long numContainers, + String command, Component.RestartPolicyEnum restartPolicyEnum, + List<String> dependencies) { + Component comp = createComponent(name, numContainers, command); + comp.setRestartPolicy(restartPolicyEnum); + + if (dependencies != null) { + comp.dependencies(dependencies); + } + return comp; + } + public static SliderFileSystem initMockFs() throws IOException { return initMockFs(null); } @@ -306,6 +344,12 @@ public class ServiceTestUtils { return client; } + public static ServiceManager createServiceManager(ServiceContext context) { + ServiceManager serviceManager = new ServiceManager(context); + context.setServiceManager(serviceManager); + return serviceManager; + } + /** * Creates a YarnClient for test purposes. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java index 56a0c71..fc509f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java @@ -227,14 +227,16 @@ public class TestServiceManager { } public static Service createBaseDef(String name) { + return createDef(name, ServiceTestUtils.createExampleApplication()); + } + + public static Service createDef(String name, Service serviceDef) { ApplicationId applicationId = ApplicationId.newInstance( System.currentTimeMillis(), 1); - Service serviceDef = ServiceTestUtils.createExampleApplication(); serviceDef.setId(applicationId.toString()); serviceDef.setName(name); serviceDef.setState(ServiceState.STARTED); Artifact artifact = createTestArtifact("1"); - serviceDef.getComponents().forEach(component -> component.setArtifact(artifact)); return serviceDef; http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java index 600e438..d7c15ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponent.java @@ -38,8 +38,10 @@ import org.apache.hadoop.yarn.service.api.records.Service; import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType; + import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService; import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -50,6 +52,7 @@ import java.util.Iterator; import java.util.Map; import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP; + import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -60,6 +63,9 @@ import static org.mockito.Mockito.when; */ public class TestComponent { + private static final int WAIT_MS_PER_LOOP = 1000; + static final Logger LOG = Logger.getLogger(TestComponent.class); + @Rule public ServiceTestUtils.ServiceFSWatcher rule = new ServiceTestUtils.ServiceFSWatcher(); @@ -158,6 +164,57 @@ public class TestComponent { comp.getComponentSpec().getConfiguration().getEnv("key1")); } + @Test + public void testComponentStateUpdatesWithTerminatingComponents() throws + Exception { + final String serviceName = + "testComponentStateUpdatesWithTerminatingComponents"; + + Service testService = ServiceTestUtils.createTerminatingJobExample( + serviceName); + TestServiceManager.createDef(serviceName, testService); + + ServiceContext context = createTestContext(rule, testService); + + for (Component comp : context.scheduler.getAllComponents().values()) { + + Iterator<ComponentInstance> instanceIter = comp. + getAllComponentInstances().iterator(); + + ComponentInstance componentInstance = instanceIter.next(); + Container instanceContainer = componentInstance.getContainer(); + + Assert.assertEquals(0, comp.getNumSucceededInstances()); + Assert.assertEquals(0, comp.getNumFailedInstances()); + Assert.assertEquals(2, comp.getNumRunningInstances()); + Assert.assertEquals(2, comp.getNumReadyInstances()); + Assert.assertEquals(0, comp.getPendingInstances().size()); + + //stop 1 container + ContainerStatus containerStatus = ContainerStatus.newInstance( + instanceContainer.getId(), + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, + "successful", 0); + comp.handle(new ComponentEvent(comp.getName(), + ComponentEventType.CONTAINER_COMPLETED).setStatus(containerStatus)); + componentInstance.handle( + new ComponentInstanceEvent(componentInstance.getContainer().getId(), + ComponentInstanceEventType.STOP).setStatus(containerStatus)); + + Assert.assertEquals(1, comp.getNumSucceededInstances()); + Assert.assertEquals(0, comp.getNumFailedInstances()); + Assert.assertEquals(1, comp.getNumRunningInstances()); + Assert.assertEquals(1, comp.getNumReadyInstances()); + Assert.assertEquals(0, comp.getPendingInstances().size()); + + org.apache.hadoop.yarn.service.component.ComponentState componentState = + Component.checkIfStable(comp); + Assert.assertEquals( + org.apache.hadoop.yarn.service.component.ComponentState.STABLE, + componentState); + } + } + private static org.apache.hadoop.yarn.service.api.records.Component createSpecWithEnv(String serviceName, String compName, String key, String val) { @@ -171,31 +228,38 @@ public class TestComponent { public static ServiceContext createTestContext( ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName) throws Exception { + return createTestContext(fsWatcher, + TestServiceManager.createBaseDef(serviceName)); + } + + public static ServiceContext createTestContext( + ServiceTestUtils.ServiceFSWatcher fsWatcher, Service serviceDef) + throws Exception { ServiceContext context = new ServiceContext(); - context.service = TestServiceManager.createBaseDef(serviceName); + context.service = serviceDef; context.fs = fsWatcher.getFs(); ContainerLaunchService mockLaunchService = mock( ContainerLaunchService.class); context.scheduler = new ServiceScheduler(context) { - @Override - protected YarnRegistryViewForProviders createYarnRegistryOperations( + @Override protected YarnRegistryViewForProviders + createYarnRegistryOperations( ServiceContext context, RegistryOperations registryClient) { return mock(YarnRegistryViewForProviders.class); } - @Override - public NMClientAsync createNMClient() { + @Override public NMClientAsync createNMClient() { NMClientAsync nmClientAsync = super.createNMClient(); NMClient nmClient = mock(NMClient.class); try { when(nmClient.getContainerStatus(anyObject(), anyObject())) - .thenAnswer((Answer<ContainerStatus>) invocation -> - ContainerStatus.newInstance( - (ContainerId) invocation.getArguments()[0], - org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, - "", 0)); + .thenAnswer( + (Answer<ContainerStatus>) invocation -> ContainerStatus + .newInstance((ContainerId) invocation.getArguments()[0], + org.apache.hadoop.yarn.api.records.ContainerState + .RUNNING, + "", 0)); } catch (YarnException | IOException e) { throw new RuntimeException(e); } @@ -203,16 +267,18 @@ public class TestComponent { return nmClientAsync; } - @Override - public ContainerLaunchService getContainerLaunchService() { + @Override public ContainerLaunchService getContainerLaunchService() { return mockLaunchService; } }; context.scheduler.init(fsWatcher.getConf()); + ServiceTestUtils.createServiceManager(context); + doNothing().when(mockLaunchService). reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject()); stabilizeComponents(context); + return context; } @@ -223,6 +289,8 @@ public class TestComponent { context.attemptId = attemptId; Map<String, Component> componentState = context.scheduler.getAllComponents(); + + int counter = 0; for (org.apache.hadoop.yarn.service.api.records.Component componentSpec : context.service.getComponents()) { Component component = new org.apache.hadoop.yarn.service.component. @@ -230,9 +298,12 @@ public class TestComponent { componentState.put(component.getName(), component); component.handle(new ComponentEvent(component.getName(), ComponentEventType.FLEX)); + for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) { - assignNewContainer(attemptId, i + 1, context, component); + counter++; + assignNewContainer(attemptId, counter, context, component); } + component.handle(new ComponentEvent(component.getName(), ComponentEventType.CHECK_STABLE)); } @@ -241,6 +312,8 @@ public class TestComponent { private static void assignNewContainer( ApplicationAttemptId attemptId, long containerNum, ServiceContext context, Component component) { + + Container container = org.apache.hadoop.yarn.api.records.Container .newInstance(ContainerId.newContainerId(attemptId, containerNum), NODE_ID, "localhost", null, null, http://git-wip-us.apache.org/repos/asf/hadoop/blob/7f083ed8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java new file mode 100644 index 0000000..60f5c91 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/component/TestComponentRestartPolicy.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service.component; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.service.component.instance.ComponentInstance; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for ComponentRestartPolicy implementations. + */ +public class TestComponentRestartPolicy { + + @Test + public void testAlwaysRestartPolicy() throws Exception { + + AlwaysRestartPolicy alwaysRestartPolicy = AlwaysRestartPolicy.getInstance(); + + Component component = mock(Component.class); + when(component.getNumReadyInstances()).thenReturn(1); + when(component.getNumDesiredInstances()).thenReturn(2); + + ComponentInstance instance = mock(ComponentInstance.class); + when(instance.getComponent()).thenReturn(component); + + ContainerStatus containerStatus = mock(ContainerStatus.class); + + assertEquals(true, alwaysRestartPolicy.isLongLived()); + assertEquals(true, alwaysRestartPolicy.allowUpgrades()); + assertEquals(false, alwaysRestartPolicy.hasCompleted(component)); + assertEquals(false, + alwaysRestartPolicy.hasCompletedSuccessfully(component)); + + assertEquals(true, + alwaysRestartPolicy.shouldRelaunchInstance(instance, containerStatus)); + + assertEquals(false, alwaysRestartPolicy.isReadyForDownStream(component)); + } + + @Test + public void testNeverRestartPolicy() throws Exception { + + NeverRestartPolicy restartPolicy = NeverRestartPolicy.getInstance(); + + Component component = mock(Component.class); + when(component.getNumSucceededInstances()).thenReturn(new Long(1)); + when(component.getNumFailedInstances()).thenReturn(new Long(2)); + when(component.getNumDesiredInstances()).thenReturn(3); + + ComponentInstance instance = mock(ComponentInstance.class); + when(instance.getComponent()).thenReturn(component); + + ContainerStatus containerStatus = mock(ContainerStatus.class); + + assertEquals(false, restartPolicy.isLongLived()); + assertEquals(false, restartPolicy.allowUpgrades()); + assertEquals(true, restartPolicy.hasCompleted(component)); + assertEquals(false, + restartPolicy.hasCompletedSuccessfully(component)); + + assertEquals(false, + restartPolicy.shouldRelaunchInstance(instance, containerStatus)); + + assertEquals(true, restartPolicy.isReadyForDownStream(component)); + } + + @Test + public void testOnFailureRestartPolicy() throws Exception { + + OnFailureRestartPolicy restartPolicy = OnFailureRestartPolicy.getInstance(); + + Component component = mock(Component.class); + when(component.getNumSucceededInstances()).thenReturn(new Long(3)); + when(component.getNumFailedInstances()).thenReturn(new Long(0)); + when(component.getNumDesiredInstances()).thenReturn(3); + + ComponentInstance instance = mock(ComponentInstance.class); + when(instance.getComponent()).thenReturn(component); + + ContainerStatus containerStatus = mock(ContainerStatus.class); + when(containerStatus.getExitStatus()).thenReturn(0); + + assertEquals(false, restartPolicy.isLongLived()); + assertEquals(false, restartPolicy.allowUpgrades()); + assertEquals(true, restartPolicy.hasCompleted(component)); + assertEquals(true, + restartPolicy.hasCompletedSuccessfully(component)); + + assertEquals(false, + restartPolicy.shouldRelaunchInstance(instance, containerStatus)); + + assertEquals(true, restartPolicy.isReadyForDownStream(component)); + + + when(component.getNumSucceededInstances()).thenReturn(new Long(2)); + when(component.getNumFailedInstances()).thenReturn(new Long(1)); + when(component.getNumDesiredInstances()).thenReturn(3); + + assertEquals(false, restartPolicy.hasCompleted(component)); + assertEquals(false, + restartPolicy.hasCompletedSuccessfully(component)); + + when(containerStatus.getExitStatus()).thenReturn(-1000); + + assertEquals(true, + restartPolicy.shouldRelaunchInstance(instance, containerStatus)); + + assertEquals(false, restartPolicy.isReadyForDownStream(component)); + + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org