YARN-8018. Added support for initiating yarn service upgrade. Contributed by Chandni Singh
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/27d60a16 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/27d60a16 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/27d60a16 Branch: refs/heads/HDFS-12943 Commit: 27d60a16342fd39973d43b61008f54a8815a6237 Parents: edb202e Author: Eric Yang <ey...@apache.org> Authored: Mon Mar 26 18:46:31 2018 -0400 Committer: Eric Yang <ey...@apache.org> Committed: Mon Mar 26 18:46:31 2018 -0400 ---------------------------------------------------------------------- .../yarn/service/client/ApiServiceClient.java | 18 ++ .../hadoop/yarn/service/webapp/ApiServer.java | 24 ++ .../hadoop/yarn/service/ClientAMProtocol.java | 12 + .../hadoop/yarn/service/ClientAMService.java | 24 ++ .../hadoop/yarn/service/ServiceEvent.java | 49 ++++ .../hadoop/yarn/service/ServiceEventType.java | 28 +++ .../hadoop/yarn/service/ServiceManager.java | 225 +++++++++++++++++++ .../hadoop/yarn/service/ServiceScheduler.java | 19 ++ .../yarn/service/UpgradeComponentsFinder.java | 162 +++++++++++++ .../service/api/records/ComponentState.java | 2 +- .../yarn/service/api/records/ServiceState.java | 2 +- .../yarn/service/client/ServiceClient.java | 153 ++++++++++--- .../yarn/service/component/Component.java | 14 ++ .../yarn/service/component/ComponentEvent.java | 12 + .../service/component/ComponentEventType.java | 4 +- .../yarn/service/component/ComponentState.java | 3 +- .../yarn/service/conf/YarnServiceConstants.java | 2 + .../pb/client/ClientAMProtocolPBClientImpl.java | 26 +++ .../service/ClientAMProtocolPBServiceImpl.java | 24 ++ .../yarn/service/utils/CoreFileSystem.java | 31 ++- .../yarn/service/utils/ServiceApiUtil.java | 28 ++- .../src/main/proto/ClientAMProtocol.proto | 19 +- .../hadoop/yarn/service/ServiceTestUtils.java | 90 +++++++- .../TestDefaultUpgradeComponentsFinder.java | 63 ++++++ .../hadoop/yarn/service/TestServiceManager.java | 156 +++++++++++++ .../yarn/service/TestYarnNativeServices.java | 76 ++++--- .../yarn/service/client/TestServiceClient.java | 125 +++++++++++ .../yarn/service/utils/TestCoreFileSystem.java | 46 ++++ .../hadoop/yarn/client/api/AppAdminClient.java | 16 ++ 29 files changed, 1374 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index 49702e3..e4a245d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -469,4 +469,22 @@ public class ApiServiceClient extends AppAdminClient { return output; } + @Override + public int actionUpgrade(String appName, + String fileName) throws IOException, YarnException { + int result; + try { + Service service = + loadAppJsonFromLocalFS(fileName, appName, null, null); + service.setState(ServiceState.UPGRADING); + String buffer = jsonSerDeser.toJson(service); + ClientResponse response = getApiClient() + .post(ClientResponse.class, buffer); + result = processResponse(response); + } catch (Exception e) { + LOG.error("Failed to upgrade application: ", e); + result = EXIT_EXCEPTION_THROWN; + } + return result; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java index e7979b8..59ee05d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java @@ -375,6 +375,12 @@ public class ApiServer { && updateServiceData.getLifetime() > 0) { return updateLifetime(appName, updateServiceData, ugi); } + + // If an UPGRADE is requested + if (updateServiceData.getState() != null && + updateServiceData.getState() == ServiceState.UPGRADING) { + return upgradeService(updateServiceData, ugi); + } } catch (UndeclaredThrowableException e) { return formatResponse(Status.BAD_REQUEST, e.getCause().getMessage()); @@ -475,6 +481,24 @@ public class ApiServer { return formatResponse(Status.OK, status); } + private Response upgradeService(Service service, + final UserGroupInformation ugi) throws IOException, InterruptedException { + ServiceStatus status = new ServiceStatus(); + ugi.doAs((PrivilegedExceptionAction<Void>) () -> { + ServiceClient sc = getServiceClient(); + sc.init(YARN_CONFIG); + sc.start(); + sc.actionUpgrade(service); + sc.close(); + return null; + }); + LOG.info("Service {} version {} upgrade initialized"); + status.setDiagnostics("Service " + service.getName() + + " version " + service.getVersion() + " saved."); + status.setState(ServiceState.ACCEPTED); + return formatResponse(Status.ACCEPTED, status); + } + /** * Used by negative test case. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java index 516d23d..4422451 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMProtocol.java @@ -23,8 +23,14 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto; + import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; + import java.io.IOException; @@ -37,4 +43,10 @@ public interface ClientAMProtocol { StopResponseProto stop(StopRequestProto requestProto) throws IOException, YarnException; + + UpgradeServiceResponseProto upgrade(UpgradeServiceRequestProto request) + throws IOException, YarnException; + + RestartServiceResponseProto restart(RestartServiceRequestProto request) + throws IOException, YarnException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java index fb73f15..08c36f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java @@ -33,8 +33,12 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; import org.apache.hadoop.yarn.service.component.ComponentEvent; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.slf4j.Logger; @@ -142,4 +146,24 @@ public class ClientAMService extends AbstractService public InetSocketAddress getBindAddress() { return bindAddress; } + + @Override + public UpgradeServiceResponseProto upgrade( + UpgradeServiceRequestProto request) throws IOException { + ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE); + event.setVersion(request.getVersion()); + context.scheduler.getDispatcher().getEventHandler().handle(event); + LOG.info("Upgrading service to version {} by {}", request.getVersion(), + UserGroupInformation.getCurrentUser()); + return UpgradeServiceResponseProto.newBuilder().build(); + } + + @Override + public RestartServiceResponseProto restart(RestartServiceRequestProto request) + throws IOException, YarnException { + ServiceEvent event = new ServiceEvent(ServiceEventType.START); + context.scheduler.getDispatcher().getEventHandler().handle(event); + LOG.info("Restart service by {}", UserGroupInformation.getCurrentUser()); + return RestartServiceResponseProto.newBuilder().build(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java new file mode 100644 index 0000000..9e7d442 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEvent.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +/** + * Events are handled by {@link ServiceManager} to manage the service + * state. + */ +public class ServiceEvent extends AbstractEvent<ServiceEventType> { + + private final ServiceEventType type; + private String version; + + public ServiceEvent(ServiceEventType serviceEventType) { + super(serviceEventType); + this.type = serviceEventType; + } + + public ServiceEventType getType() { + return type; + } + + public String getVersion() { + return version; + } + + public ServiceEvent setVersion(String version) { + this.version = version; + return this; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java new file mode 100644 index 0000000..2162eb5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceEventType.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +/** + * Types of {@link ServiceEvent}. + */ +public enum ServiceEventType { + START, + UPGRADE, + STOP_UPGRADE +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java new file mode 100644 index 0000000..a3fbe89 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceManager.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.component.ComponentEvent; +import org.apache.hadoop.yarn.service.component.ComponentEventType; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; +import org.apache.hadoop.yarn.service.utils.SliderFileSystem; +import org.apache.hadoop.yarn.state.InvalidStateTransitionException; +import org.apache.hadoop.yarn.state.MultipleArcTransition; +import org.apache.hadoop.yarn.state.StateMachine; +import org.apache.hadoop.yarn.state.StateMachineFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.text.MessageFormat; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Manages the state of the service. + */ +public class ServiceManager implements EventHandler<ServiceEvent> { + private static final Logger LOG = LoggerFactory.getLogger( + ServiceManager.class); + + private final Service serviceSpec; + private final ServiceContext context; + private final ServiceScheduler scheduler; + private final ReentrantReadWriteLock.ReadLock readLock; + private final ReentrantReadWriteLock.WriteLock writeLock; + + private final StateMachine<State, ServiceEventType, ServiceEvent> + stateMachine; + + private final AsyncDispatcher dispatcher; + private final SliderFileSystem fs; + private final UpgradeComponentsFinder componentsFinder; + + private String upgradeVersion; + + private static final StateMachineFactory<ServiceManager, State, + ServiceEventType, ServiceEvent> STATE_MACHINE_FACTORY = + new StateMachineFactory<ServiceManager, State, + ServiceEventType, ServiceEvent>(State.STABLE) + + .addTransition(State.STABLE, EnumSet.of(State.STABLE, + State.UPGRADING), ServiceEventType.UPGRADE, + new StartUpgradeTransition()) + + .addTransition(State.UPGRADING, EnumSet.of(State.STABLE, + State.UPGRADING), ServiceEventType.START, + new StopUpgradeTransition()) + .installTopology(); + + public ServiceManager(ServiceContext context) { + Preconditions.checkNotNull(context); + this.context = context; + serviceSpec = context.service; + scheduler = context.scheduler; + stateMachine = STATE_MACHINE_FACTORY.make(this); + dispatcher = scheduler.getDispatcher(); + + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + fs = context.fs; + componentsFinder = new UpgradeComponentsFinder + .DefaultUpgradeComponentsFinder(); + } + + @Override + public void handle(ServiceEvent event) { + try { + writeLock.lock(); + State oldState = getState(); + try { + stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitionException e) { + LOG.error(MessageFormat.format( + "[SERVICE]: Invalid event {0} at {1}.", event.getType(), + oldState), e); + } + if (oldState != getState()) { + LOG.info("[SERVICE] Transitioned from {} to {} on {} event.", + oldState, getState(), event.getType()); + } + } finally { + writeLock.unlock(); + } + } + + private State getState() { + this.readLock.lock(); + try { + return this.stateMachine.getCurrentState(); + } finally { + this.readLock.unlock(); + } + } + + private static class StartUpgradeTransition implements + MultipleArcTransition<ServiceManager, ServiceEvent, State> { + + @Override + public State transition(ServiceManager serviceManager, + ServiceEvent event) { + try { + Service targetSpec = ServiceApiUtil.loadServiceUpgrade( + serviceManager.fs, serviceManager.getName(), event.getVersion()); + + serviceManager.serviceSpec.setState(ServiceState.UPGRADING); + List<org.apache.hadoop.yarn.service.api.records.Component> + compsThatNeedUpgrade = serviceManager.componentsFinder. + findTargetComponentSpecs(serviceManager.serviceSpec, targetSpec); + + if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) { + compsThatNeedUpgrade.forEach(component -> { + ComponentEvent needUpgradeEvent = new ComponentEvent( + component.getName(), ComponentEventType.UPGRADE). + setTargetSpec(component); + serviceManager.dispatcher.getEventHandler().handle( + needUpgradeEvent); + }); + } + serviceManager.upgradeVersion = event.getVersion(); + return State.UPGRADING; + } catch (Throwable e) { + LOG.error("[SERVICE]: Upgrade to version {} failed", event.getVersion(), + e); + return State.STABLE; + } + } + } + + private static class StopUpgradeTransition implements + MultipleArcTransition<ServiceManager, ServiceEvent, State> { + + @Override + public State transition(ServiceManager serviceManager, + ServiceEvent event) { + //abort is not supported currently + //trigger re-check of service state + ServiceMaster.checkAndUpdateServiceState(serviceManager.scheduler, + true); + if (serviceManager.serviceSpec.getState().equals(ServiceState.STABLE)) { + return serviceManager.finalizeUpgrade() ? State.STABLE : + State.UPGRADING; + } else { + return State.UPGRADING; + } + } + } + + /** + * @return whether finalization of upgrade was successful. + */ + private boolean finalizeUpgrade() { + try { + Service upgradeSpec = ServiceApiUtil.loadServiceUpgrade( + fs, getName(), upgradeVersion); + ServiceApiUtil.writeAppDefinition(fs, + ServiceApiUtil.getServiceJsonPath(fs, getName()), upgradeSpec); + } catch (IOException e) { + LOG.error("Upgrade did not complete because unable to overwrite the" + + " service definition", e); + return false; + } + + try { + fs.deleteClusterUpgradeDir(getName(), upgradeVersion); + } catch (IOException e) { + LOG.warn("Unable to delete upgrade definition for service {} " + + "version {}", getName(), upgradeVersion); + } + serviceSpec.setVersion(upgradeVersion); + upgradeVersion = null; + return true; + } + + /** + * Returns the name of the service. + */ + public String getName() { + return serviceSpec.getName(); + } + + /** + * State of {@link ServiceManager}. + */ + public enum State { + STABLE, UPGRADING + } + + + @VisibleForTesting + Service getServiceSpec() { + return serviceSpec; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index 6333197..79eef49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -110,6 +110,9 @@ public class ServiceScheduler extends CompositeService { LoggerFactory.getLogger(ServiceScheduler.class); private Service app; + // This encapsulates the <code>app</code> with methods to upgrade the app. + private ServiceManager serviceManager; + // component_name -> component private final Map<String, Component> componentsByName = new ConcurrentHashMap<>(); @@ -192,6 +195,7 @@ public class ServiceScheduler extends CompositeService { addIfService(nmClient); dispatcher = new AsyncDispatcher("Component dispatcher"); + dispatcher.register(ServiceEventType.class, new ServiceEventHandler()); dispatcher.register(ComponentEventType.class, new ComponentEventHandler()); dispatcher.register(ComponentInstanceEventType.class, @@ -300,6 +304,7 @@ public class ServiceScheduler extends CompositeService { // Since AM has been started and registered, the service is in STARTED state app.setState(ServiceState.STARTED); + serviceManager = new ServiceManager(context); // recover components based on containers sent from RM recoverComponents(response); @@ -510,6 +515,20 @@ public class ServiceScheduler extends CompositeService { } } + private final class ServiceEventHandler + implements EventHandler<ServiceEvent> { + @Override + public void handle(ServiceEvent event) { + try { + serviceManager.handle(event); + } catch (Throwable t) { + LOG.error(MessageFormat + .format("[SERVICE]: Error in handling event type {0}", + event.getType()), t); + } + } + } + private final class ComponentEventHandler implements EventHandler<ComponentEvent> { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java new file mode 100644 index 0000000..e18697b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/UpgradeComponentsFinder.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Service; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Finds all the the target component specs. + */ +public interface UpgradeComponentsFinder { + + List<Component> findTargetComponentSpecs(Service currentDef, + Service targetDef); + + /** + * Default implementation of {@link UpgradeComponentsFinder} that finds all + * the target component specs. + */ + class DefaultUpgradeComponentsFinder implements UpgradeComponentsFinder { + + @Override + public List<Component> findTargetComponentSpecs(Service currentDef, + Service targetDef) { + if (currentDef.getComponents().size() != + targetDef.getComponents().size()) { + throw new UnsupportedOperationException( + "addition/deletion of components not supported by upgrade"); + } + if (!currentDef.getKerberosPrincipal().equals( + targetDef.getKerberosPrincipal())) { + throw new UnsupportedOperationException("changes to kerberos " + + "principal not supported by upgrade"); + } + if (!Objects.equals(currentDef.getQueue(), targetDef.getQueue())) { + throw new UnsupportedOperationException("changes to queue " + + "not supported by upgrade"); + } + if (!Objects.equals(currentDef.getPlacementPolicy(), + targetDef.getPlacementPolicy())) { + throw new UnsupportedOperationException("changes to placement policy " + + "not supported by upgrade"); + } + + if (!Objects.equals(currentDef.getResource(), targetDef.getResource())) { + throw new UnsupportedOperationException("changes to resource " + + "not supported by upgrade"); + } + + if (!Objects.equals(currentDef.getDescription(), + targetDef.getDescription())) { + throw new UnsupportedOperationException("changes to description " + + "not supported by upgrade"); + } + + if (!Objects.equals(currentDef.getQuicklinks(), + targetDef.getQuicklinks())) { + throw new UnsupportedOperationException("changes to quick links " + + "not supported by upgrade"); + } + + if (!Objects.equals(currentDef.getLaunchTime(), + targetDef.getLaunchTime())) { + throw new UnsupportedOperationException("changes to launch time " + + "not supported by upgrade"); + } + + + if (!Objects.equals(currentDef.getLifetime(), + targetDef.getLifetime())) { + throw new UnsupportedOperationException("changes to lifetime " + + "not supported by upgrade"); + } + + if (!Objects.equals(currentDef.getConfiguration(), + currentDef.getConfiguration())) { + return targetDef.getComponents(); + } + + if (!Objects.equals(currentDef.getArtifact(), targetDef.getArtifact())) { + return targetDef.getComponents(); + } + + List<Component> targetComps = new ArrayList<>(); + targetDef.getComponents().forEach(component -> { + Component currentComp = currentDef.getComponent(component.getName()); + + if (!Objects.equals(currentComp.getName(), component.getName())) { + throw new UnsupportedOperationException( + "changes to component name not supported by upgrade"); + } + + if (!Objects.equals(currentComp.getDependencies(), + component.getDependencies())) { + throw new UnsupportedOperationException( + "changes to component dependencies not supported by upgrade"); + } + + if (!Objects.equals(currentComp.getReadinessCheck(), + component.getReadinessCheck())) { + throw new UnsupportedOperationException( + "changes to component readiness check not supported by upgrade"); + } + + if (!Objects.equals(currentComp.getResource(), + component.getResource())) { + throw new UnsupportedOperationException( + "changes to component resource not supported by upgrade"); + } + + + if (!Objects.equals(currentComp.getRunPrivilegedContainer(), + component.getRunPrivilegedContainer())) { + throw new UnsupportedOperationException( + "changes to run privileged container not supported by upgrade"); + } + + if (!Objects.equals(currentComp.getPlacementPolicy(), + component.getPlacementPolicy())) { + throw new UnsupportedOperationException( + "changes to component placement policy not supported by upgrade"); + } + + if (!Objects.equals(currentComp.getQuicklinks(), + component.getQuicklinks())) { + throw new UnsupportedOperationException( + "changes to component quick links not supported by upgrade"); + } + + if (!Objects.equals(currentComp.getArtifact(), + component.getArtifact()) || + !Objects.equals(currentComp.getLaunchCommand(), + component.getLaunchCommand()) || + !Objects.equals(currentComp.getConfiguration(), + component.getConfiguration())) { + targetComps.add(component); + } + }); + return targetComps; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java index 702a9ae..f7eda7b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ComponentState.java @@ -26,5 +26,5 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable @ApiModel(description = "The current state of a component.") public enum ComponentState { - FLEXING, STABLE + FLEXING, STABLE, NEEDS_UPGRADE; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java index 902a0b1..286eaa2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/ServiceState.java @@ -29,5 +29,5 @@ import org.apache.hadoop.classification.InterfaceStability; @ApiModel(description = "The current state of an service.") @javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00") public enum ServiceState { - ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX; + ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index 5731e11..04ca943 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service.client; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -55,7 +56,9 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; import org.apache.hadoop.yarn.service.ClientAMProtocol; import org.apache.hadoop.yarn.service.ServiceMaster; import org.apache.hadoop.yarn.service.api.records.Component; @@ -73,8 +76,8 @@ import org.apache.hadoop.yarn.service.provider.AbstractClientProvider; import org.apache.hadoop.yarn.service.provider.ProviderUtils; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.ServiceRegistryUtils; -import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.ServiceUtils; +import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.service.utils.ZookeeperUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Times; @@ -186,6 +189,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return service; } + @Override public int actionSave(String fileName, String serviceName, Long lifetime, String queue) throws IOException, YarnException { return actionBuild(loadAppJsonFromLocalFS(fileName, serviceName, @@ -194,9 +198,54 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, public int actionBuild(Service service) throws YarnException, IOException { - Path appDir = checkAppNotExistOnHdfs(service); ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); - createDirAndPersistApp(appDir, service); + Path appDir = checkAppNotExistOnHdfs(service, false); + ServiceApiUtil.createDirAndPersistApp(fs, appDir, service); + return EXIT_SUCCESS; + } + + @Override + public int actionUpgrade(String appName, String fileName) + throws IOException, YarnException { + checkAppExistOnHdfs(appName); + Service upgradeService = loadAppJsonFromLocalFS(fileName, appName, + null, null); + return actionUpgrade(upgradeService); + } + + public int actionUpgrade(Service service) throws YarnException, IOException { + Service persistedService = + ServiceApiUtil.loadService(fs, service.getName()); + if (!StringUtils.isEmpty(persistedService.getId())) { + cachedAppInfo.put(persistedService.getName(), new AppInfo( + ApplicationId.fromString(persistedService.getId()), + persistedService.getKerberosPrincipal().getPrincipalName())); + } + + if (persistedService.getVersion().equals(service.getVersion())) { + String message = + service.getName() + " is already at version " + service.getVersion() + + ". There is nothing to upgrade."; + LOG.error(message); + throw new YarnException(message); + } + + Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true); + ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); + ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service); + + ApplicationReport appReport = + yarnClient.getApplicationReport(getAppId(service.getName())); + if (StringUtils.isEmpty(appReport.getHost())) { + throw new YarnException(service.getName() + " AM hostname is empty"); + } + ClientAMProtocol proxy = createAMProxy(service.getName(), appReport); + + UpgradeServiceRequestProto.Builder requestBuilder = + UpgradeServiceRequestProto.newBuilder(); + requestBuilder.setVersion(service.getVersion()); + + proxy.upgrade(requestBuilder.build()); return EXIT_SUCCESS; } @@ -212,16 +261,16 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, String serviceName = service.getName(); ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); verifyNoLiveAppInRM(serviceName, "create"); - Path appDir = checkAppNotExistOnHdfs(service); + Path appDir = checkAppNotExistOnHdfs(service, false); // Write the definition first and then submit - AM will read the definition - createDirAndPersistApp(appDir, service); + ServiceApiUtil.createDirAndPersistApp(fs, appDir, service); ApplicationId appId = submitApp(service); cachedAppInfo.put(serviceName, new AppInfo(appId, service .getKerberosPrincipal().getPrincipalName())); service.setId(appId.toString()); // update app definition with appId - persistAppDef(appDir, service); + ServiceApiUtil.writeAppDefinition(fs, appDir, service); return appId; } @@ -349,6 +398,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return original; } + @Override public int actionStop(String serviceName) throws YarnException, IOException { return actionStop(serviceName, true); @@ -424,6 +474,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return EXIT_SUCCESS; } + @Override public int actionDestroy(String serviceName) throws YarnException, IOException { ServiceApiUtil.validateNameFormat(serviceName, getConfig()); @@ -557,8 +608,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, } } - private ApplicationId submitApp(Service app) - throws IOException, YarnException { + @VisibleForTesting + ApplicationId submitApp(Service app) throws IOException, YarnException { String serviceName = app.getName(); Configuration conf = getConfig(); Path appRootDir = fs.buildClusterDirPath(app.getName()); @@ -772,29 +823,64 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return hasAMLog4j; } + @Override public int actionStart(String serviceName) throws YarnException, IOException { ServiceApiUtil.validateNameFormat(serviceName, getConfig()); - Path appDir = checkAppExistOnHdfs(serviceName); - Service service = ServiceApiUtil.loadService(fs, serviceName); - ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); - // see if it is actually running and bail out; - verifyNoLiveAppInRM(serviceName, "start"); - ApplicationId appId = submitApp(service); - service.setId(appId.toString()); - // write app definition on to hdfs - Path appJson = persistAppDef(appDir, service); - LOG.info("Persisted service " + service.getName() + " at " + appJson); - return 0; + Service liveService = getStatus(serviceName); + if (liveService == null || + !liveService.getState().equals(ServiceState.UPGRADING)) { + Path appDir = checkAppExistOnHdfs(serviceName); + Service service = ServiceApiUtil.loadService(fs, serviceName); + ServiceApiUtil.validateAndResolveService(service, fs, getConfig()); + // see if it is actually running and bail out; + verifyNoLiveAppInRM(serviceName, "start"); + ApplicationId appId = submitApp(service); + service.setId(appId.toString()); + // write app definition on to hdfs + Path appJson = ServiceApiUtil.writeAppDefinition(fs, appDir, service); + LOG.info("Persisted service " + service.getName() + " at " + appJson); + return 0; + } else { + LOG.info("Finalize service {} upgrade"); + ApplicationReport appReport = + yarnClient.getApplicationReport(getAppId(serviceName)); + if (StringUtils.isEmpty(appReport.getHost())) { + throw new YarnException(serviceName + " AM hostname is empty"); + } + ClientAMProtocol proxy = createAMProxy(serviceName, appReport); + + RestartServiceRequestProto.Builder requestBuilder = + RestartServiceRequestProto.newBuilder(); + proxy.restart(requestBuilder.build()); + return 0; + } } - private Path checkAppNotExistOnHdfs(Service service) + /** + * Verifies that the service definition does not exist on hdfs. + * + * @param service service + * @param isUpgrade true for upgrades; false otherwise + * @return path to the service definition.. + * @throws IOException + * @throws SliderException + */ + private Path checkAppNotExistOnHdfs(Service service, boolean isUpgrade) throws IOException, SliderException { - Path appDir = fs.buildClusterDirPath(service.getName()); + Path appDir = !isUpgrade ? fs.buildClusterDirPath(service.getName()) : + fs.buildClusterUpgradeDirPath(service.getName(), service.getVersion()); fs.verifyDirectoryNonexistent( new Path(appDir, service.getName() + ".json")); return appDir; } + /** + * Verifies that the service exists on hdfs. + * @param serviceName service name + * @return path to the service definition. + * @throws IOException + * @throws SliderException + */ private Path checkAppExistOnHdfs(String serviceName) throws IOException, SliderException { Path appDir = fs.buildClusterDirPath(serviceName); @@ -802,20 +888,6 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return appDir; } - private void createDirAndPersistApp(Path appDir, Service service) - throws IOException, SliderException { - FsPermission appDirPermission = new FsPermission("750"); - fs.createWithPermissions(appDir, appDirPermission); - Path appJson = persistAppDef(appDir, service); - LOG.info("Persisted service " + service.getName() + " at " + appJson); - } - - private Path persistAppDef(Path appDir, Service service) throws IOException { - Path appJson = new Path(appDir, service.getName() + ".json"); - jsonSerDeser.save(fs.getFileSystem(), appJson, service, true); - return appJson; - } - private void addHdfsDelegationTokenIfSecure(ContainerLaunchContext amContext) throws IOException { if (!UserGroupInformation.isSecurityEnabled()) { @@ -1074,6 +1146,17 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, UserGroupInformation.getCurrentUser(), rpc, address); } + @VisibleForTesting + void setFileSystem(SliderFileSystem fileSystem) + throws IOException { + this.fs = fileSystem; + } + + @VisibleForTesting + void setYarnClient(YarnClient yarnClient) { + this.yarnClient = yarnClient; + } + public synchronized ApplicationId getAppId(String serviceName) throws IOException, YarnException { if (cachedAppInfo.containsKey(serviceName)) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index 3090692..0cd7e2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -138,6 +138,12 @@ public class Component implements EventHandler<ComponentEvent> { // For flex down, go to STABLE state .addTransition(STABLE, EnumSet.of(STABLE, FLEXING), FLEX, new FlexComponentTransition()) + .addTransition(STABLE, UPGRADING, UPGRADE, + new ComponentNeedsUpgradeTransition()) + .addTransition(FLEXING, UPGRADING, UPGRADE, + new ComponentNeedsUpgradeTransition()) + .addTransition(UPGRADING, UPGRADING, UPGRADE, + new ComponentNeedsUpgradeTransition()) .installTopology(); public Component( @@ -355,6 +361,14 @@ public class Component implements EventHandler<ComponentEvent> { } } + private static class ComponentNeedsUpgradeTransition extends BaseTransition { + @Override + public void transition(Component component, ComponentEvent event) { + component.componentSpec.setState(org.apache.hadoop.yarn.service.api. + records.ComponentState.NEEDS_UPGRADE); + } + } + public void removePendingInstance(ComponentInstance instance) { pendingInstances.remove(instance); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java index 447b436..7bd5cb9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEvent.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service.component; +import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -32,6 +33,7 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> { private ComponentInstance instance; private ContainerStatus status; private ContainerId containerId; + private org.apache.hadoop.yarn.service.api.records.Component targetSpec; public ContainerId getContainerId() { return containerId; @@ -91,4 +93,14 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> { this.status = status; return this; } + + public org.apache.hadoop.yarn.service.api.records.Component getTargetSpec() { + return targetSpec; + } + + public ComponentEvent setTargetSpec( + org.apache.hadoop.yarn.service.api.records.Component targetSpec) { + this.targetSpec = Preconditions.checkNotNull(targetSpec); + return this; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java index 067302d..970788a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java @@ -23,5 +23,7 @@ public enum ComponentEventType { CONTAINER_ALLOCATED, CONTAINER_RECOVERED, CONTAINER_STARTED, - CONTAINER_COMPLETED + CONTAINER_COMPLETED, + UPGRADE, + STOP_UPGRADE } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java index a5f9ff4..0f63d03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentState.java @@ -21,5 +21,6 @@ package org.apache.hadoop.yarn.service.component; public enum ComponentState { INIT, FLEXING, - STABLE + STABLE, + UPGRADING } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java index 0378d24..7b474f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java @@ -92,4 +92,6 @@ public interface YarnServiceConstants { String CONTENT = "content"; String PRINCIPAL = "yarn.service.am.principal"; + + String UPGRADE_DIR = "upgrade"; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java index 33e33a6..8152225 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/client/ClientAMProtocolPBClientImpl.java @@ -35,8 +35,12 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; import org.apache.hadoop.yarn.service.impl.pb.service.ClientAMProtocolPB; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; public class ClientAMProtocolPBClientImpl implements ClientAMProtocol, Closeable { @@ -88,4 +92,26 @@ public class ClientAMProtocolPBClientImpl RPC.stopProxy(this.proxy); } } + + @Override + public UpgradeServiceResponseProto upgrade( + UpgradeServiceRequestProto request) throws IOException, YarnException { + try { + return proxy.upgradeService(null, request); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + } + return null; + } + + @Override + public RestartServiceResponseProto restart(RestartServiceRequestProto request) + throws IOException, YarnException { + try { + return proxy.restartService(null, request); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + } + return null; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java index 7100781..1a1a1ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/impl/pb/service/ClientAMProtocolPBServiceImpl.java @@ -25,6 +25,10 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto; import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceResponseProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto; +import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto; import org.apache.hadoop.yarn.service.ClientAMProtocol; import java.io.IOException; @@ -67,4 +71,24 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB { throw new ServiceException(e); } } + + @Override + public UpgradeServiceResponseProto upgradeService(RpcController controller, + UpgradeServiceRequestProto request) throws ServiceException { + try { + return real.upgrade(request); + } catch (IOException | YarnException e) { + throw new ServiceException(e); + } + } + + @Override + public RestartServiceResponseProto restartService(RpcController controller, + RestartServiceRequestProto request) throws ServiceException { + try { + return real.restart(request); + } catch (IOException | YarnException e) { + throw new ServiceException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java index 284825e..5c2bac6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/CoreFileSystem.java @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -112,11 +111,39 @@ public class CoreFileSystem { public Path buildClusterDirPath(String clustername) { Preconditions.checkNotNull(clustername); Path path = getBaseApplicationPath(); - return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/" + clustername); + return new Path(path, YarnServiceConstants.SERVICES_DIRECTORY + "/" + + clustername); } + /** + * Build up the upgrade path string for a cluster. No attempt to + * create the directory is made. + * + * @param clusterName name of the cluster + * @param version version of the cluster + * @return the upgrade path to the cluster + */ + public Path buildClusterUpgradeDirPath(String clusterName, String version) { + Preconditions.checkNotNull(clusterName); + Preconditions.checkNotNull(version); + return new Path(buildClusterDirPath(clusterName), + YarnServiceConstants.UPGRADE_DIR + "/" + version); + } /** + * Delete the upgrade cluster directory. + * @param clusterName name of the cluster + * @param version version of the cluster + * @throws IOException + */ + public void deleteClusterUpgradeDir(String clusterName, String version) + throws IOException { + Preconditions.checkNotNull(clusterName); + Preconditions.checkNotNull(version); + Path upgradeCluster = buildClusterUpgradeDirPath(clusterName, version); + fileSystem.delete(upgradeCluster, true); + } + /** * Build up the path string for keytab install location -no attempt to * create the directory is made * http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java index 0591775..13d9a37 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java @@ -22,9 +22,9 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.registry.client.api.RegistryConstants; import org.apache.hadoop.registry.client.binding.RegistryUtils; -import org.apache.hadoop.security.HadoopKerberosName; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.service.api.records.Service; @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.service.api.records.Artifact; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Configuration; import org.apache.hadoop.yarn.service.api.records.Resource; +import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.provider.AbstractClientProvider; import org.apache.hadoop.yarn.service.provider.ProviderFactory; import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils; @@ -275,6 +276,14 @@ public class ServiceApiUtil { return jsonSerDeser.load(fs.getFileSystem(), serviceJson); } + public static Service loadServiceUpgrade(SliderFileSystem fs, + String serviceName, String version) throws IOException { + Path versionPath = fs.buildClusterUpgradeDirPath(serviceName, version); + Path versionedDef = new Path(versionPath, serviceName + ".json"); + LOG.info("Loading service definition from {}", versionedDef); + return jsonSerDeser.load(fs.getFileSystem(), versionedDef); + } + public static Service loadServiceFrom(SliderFileSystem fs, Path appDefPath) throws IOException { LOG.info("Loading service definition from " + appDefPath); @@ -429,6 +438,23 @@ public class ServiceApiUtil { return sortByDependencies(components, sortedComponents); } + public static void createDirAndPersistApp(SliderFileSystem fs, Path appDir, + Service service) + throws IOException, SliderException { + FsPermission appDirPermission = new FsPermission("750"); + fs.createWithPermissions(appDir, appDirPermission); + Path appJson = writeAppDefinition(fs, appDir, service); + LOG.info("Persisted service {} version {} at {}", service.getName(), + service.getVersion(), appJson); + } + + public static Path writeAppDefinition(SliderFileSystem fs, Path appDir, + Service service) throws IOException { + Path appJson = new Path(appDir, service.getName() + ".json"); + jsonSerDeser.save(fs.getFileSystem(), appJson, service, true); + return appJson; + } + public static String $(String s) { return "${" + s +"}"; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto index 0a21c24..3677593 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/proto/ClientAMProtocol.proto @@ -26,6 +26,10 @@ service ClientAMProtocolService { rpc flexComponents(FlexComponentsRequestProto) returns (FlexComponentsResponseProto); rpc getStatus(GetStatusRequestProto) returns (GetStatusResponseProto); rpc stop(StopRequestProto) returns (StopResponseProto); + rpc upgradeService(UpgradeServiceRequestProto) + returns (UpgradeServiceResponseProto); + rpc restartService(RestartServiceRequestProto) + returns (RestartServiceResponseProto); } message FlexComponentsRequestProto { @@ -37,7 +41,7 @@ message ComponentCountProto { optional int64 numberOfContainers = 2; } -message FlexComponentsResponseProto{ +message FlexComponentsResponseProto { } message GetStatusRequestProto { @@ -53,4 +57,17 @@ message StopRequestProto { message StopResponseProto { +} + +message UpgradeServiceRequestProto { + optional string version = 1; +} + +message UpgradeServiceResponseProto { +} + +message RestartServiceRequestProto { +} + +message RestartServiceResponseProto { } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 9933211..8347eb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.service; +import com.google.common.base.Throwables; import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingCluster; import org.apache.hadoop.conf.Configuration; @@ -26,18 +27,23 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.http.HttpServer2; -import org.apache.hadoop.yarn.service.api.records.Service; -import org.apache.hadoop.yarn.service.conf.YarnServiceConf; +import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.Resource; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.conf.YarnServiceConf; +import org.apache.hadoop.yarn.service.exceptions.SliderException; import org.apache.hadoop.yarn.service.utils.JsonSerDeser; import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; import org.apache.hadoop.yarn.service.utils.SliderFileSystem; import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree; import org.codehaus.jackson.map.PropertyNamingStrategy; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +53,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URL; +import java.nio.file.Paths; +import java.util.Map; import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC; @@ -78,7 +86,7 @@ public class ServiceTestUtils { // Example service definition // 2 components, each of which has 2 containers. - protected Service createExampleApplication() { + public static Service createExampleApplication() { Service exampleApp = new Service(); exampleApp.setName("example-app"); exampleApp.setVersion("v1"); @@ -176,7 +184,7 @@ public class ServiceTestUtils { zkCluster.start(); conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString()); conf.set(KEY_REGISTRY_ZK_QUORUM, zkCluster.getConnectString()); - LOG.info("ZK cluster: " + zkCluster.getConnectString()); + LOG.info("ZK cluster: " + zkCluster.getConnectString()); fs = FileSystem.get(conf); basedir = new File("target", "apps"); @@ -268,4 +276,78 @@ public class ServiceTestUtils { } } + /** + * Creates a {@link ServiceClient} for test purposes. + */ + public static ServiceClient createClient(Configuration conf) + throws Exception { + ServiceClient client = new ServiceClient() { + @Override + protected Path addJarResource(String appName, + Map<String, LocalResource> localResources) + throws IOException, SliderException { + // do nothing, the Unit test will use local jars + return null; + } + }; + client.init(conf); + client.start(); + return client; + } + + + /** + * Watcher to initialize yarn service base path under target and deletes the + * the test directory when finishes. + */ + public static class ServiceFSWatcher extends TestWatcher { + private YarnConfiguration conf; + private SliderFileSystem fs; + private java.nio.file.Path serviceBasePath; + + @Override + protected void starting(Description description) { + conf = new YarnConfiguration(); + delete(description); + serviceBasePath = Paths.get("target", + description.getClassName(), description.getMethodName()); + conf.set(YARN_SERVICE_BASE_PATH, serviceBasePath.toString()); + try { + fs = new SliderFileSystem(conf); + } catch (IOException e) { + Throwables.propagate(e); + } + } + + @Override + protected void finished(Description description) { + delete(description); + } + + private void delete(Description description) { + FileUtils.deleteQuietly(Paths.get("target", + description.getClassName()).toFile()); + } + + /** + * Returns the yarn conf. + */ + public YarnConfiguration getConf() { + return conf; + } + + /** + * Returns the file system. + */ + public SliderFileSystem getFs() { + return fs; + } + + /** + * Returns the test service base path. + */ + public java.nio.file.Path getServiceBasePath() { + return serviceBasePath; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestDefaultUpgradeComponentsFinder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestDefaultUpgradeComponentsFinder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestDefaultUpgradeComponentsFinder.java new file mode 100644 index 0000000..086fdbe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestDefaultUpgradeComponentsFinder.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +/** + * Tests for {@link UpgradeComponentsFinder.DefaultUpgradeComponentsFinder}. + */ +public class TestDefaultUpgradeComponentsFinder { + + private UpgradeComponentsFinder.DefaultUpgradeComponentsFinder finder = + new UpgradeComponentsFinder.DefaultUpgradeComponentsFinder(); + + @Test + public void testServiceArtifactChange() { + Service currentDef = ServiceTestUtils.createExampleApplication(); + Service targetDef = ServiceTestUtils.createExampleApplication(); + targetDef.getComponents().forEach(x -> x.setArtifact( + TestServiceManager.createTestArtifact("v1"))); + + Assert.assertEquals("all components need upgrade", + targetDef.getComponents(), finder.findTargetComponentSpecs(currentDef, + targetDef)); + } + + @Test + public void testComponentArtifactChange() { + Service currentDef = TestServiceManager.createBaseDef("test"); + Service targetDef = TestServiceManager.createBaseDef("test"); + + targetDef.getComponents().get(0).setArtifact( + TestServiceManager.createTestArtifact("v2")); + + List<Component> expected = new ArrayList<>(); + expected.add(targetDef.getComponents().get(0)); + + Assert.assertEquals("single components needs upgrade", + expected, finder.findTargetComponentSpecs(currentDef, + targetDef)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/27d60a16/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java new file mode 100644 index 0000000..c65a5d4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestServiceManager.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.service; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.service.api.records.Artifact; +import org.apache.hadoop.yarn.service.api.records.ComponentState; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; +import org.apache.hadoop.yarn.service.exceptions.SliderException; +import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders; +import org.apache.hadoop.yarn.service.utils.ServiceApiUtil; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; + +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link ServiceManager}. + */ +public class TestServiceManager { + + @Rule + public ServiceTestUtils.ServiceFSWatcher rule = + new ServiceTestUtils.ServiceFSWatcher(); + + @Test + public void testUpgrade() throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager("testUpgrade"); + upgrade(serviceManager, "v2", false); + Assert.assertEquals("service not upgraded", ServiceState.UPGRADING, + serviceManager.getServiceSpec().getState()); + } + + @Test + public void testRestartNothingToUpgrade() + throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager("testRestart"); + upgrade(serviceManager, "v2", false); + + //make components stable + serviceManager.getServiceSpec().getComponents().forEach(comp -> { + comp.setState(ComponentState.STABLE); + }); + serviceManager.handle(new ServiceEvent(ServiceEventType.START)); + Assert.assertEquals("service not re-started", ServiceState.STABLE, + serviceManager.getServiceSpec().getState()); + } + + @Test + public void testRestartWithPendingUpgrade() + throws IOException, SliderException { + ServiceManager serviceManager = createTestServiceManager("testRestart"); + upgrade(serviceManager, "v2", true); + serviceManager.handle(new ServiceEvent(ServiceEventType.START)); + Assert.assertEquals("service should still be upgrading", + ServiceState.UPGRADING, serviceManager.getServiceSpec().getState()); + } + + + private void upgrade(ServiceManager service, String version, + boolean upgradeArtifact) + throws IOException, SliderException { + Service upgradedDef = ServiceTestUtils.createExampleApplication(); + upgradedDef.setName(service.getName()); + upgradedDef.setVersion(version); + if (upgradeArtifact) { + Artifact upgradedArtifact = createTestArtifact("2"); + upgradedDef.getComponents().forEach(component -> { + component.setArtifact(upgradedArtifact); + }); + } + writeUpgradedDef(upgradedDef); + ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE); + upgradeEvent.setVersion("v2"); + service.handle(upgradeEvent); + } + + private ServiceManager createTestServiceManager(String name) + throws IOException { + ServiceContext context = new ServiceContext(); + context.service = createBaseDef(name); + context.fs = rule.getFs(); + + context.scheduler = new ServiceScheduler(context) { + @Override + protected YarnRegistryViewForProviders createYarnRegistryOperations( + ServiceContext context, RegistryOperations registryClient) { + return mock(YarnRegistryViewForProviders.class); + } + }; + + context.scheduler.init(rule.getConf()); + + Map<String, org.apache.hadoop.yarn.service.component.Component> + componentState = context.scheduler.getAllComponents(); + context.service.getComponents().forEach(component -> { + componentState.put(component.getName(), + new org.apache.hadoop.yarn.service.component.Component(component, + 1L, context)); + }); + return new ServiceManager(context); + } + + static Service createBaseDef(String name) { + ApplicationId applicationId = ApplicationId.newInstance( + System.currentTimeMillis(), 1); + Service serviceDef = ServiceTestUtils.createExampleApplication(); + serviceDef.setId(applicationId.toString()); + serviceDef.setName(name); + serviceDef.setState(ServiceState.STARTED); + Artifact artifact = createTestArtifact("1"); + + serviceDef.getComponents().forEach(component -> + component.setArtifact(artifact)); + return serviceDef; + } + + static Artifact createTestArtifact(String artifactId) { + Artifact artifact = new Artifact(); + artifact.setId(artifactId); + artifact.setType(Artifact.TypeEnum.TARBALL); + return artifact; + } + + private void writeUpgradedDef(Service upgradedDef) + throws IOException, SliderException { + Path upgradePath = rule.getFs().buildClusterUpgradeDirPath( + upgradedDef.getName(), upgradedDef.getVersion()); + ServiceApiUtil.createDirAndPersistApp(rule.getFs(), upgradePath, + upgradedDef); + } + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org