YARN-6744. Recover component information on YARN native services AM restart. Contributed by Billie Rinaldi
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b8a7ef1b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b8a7ef1b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b8a7ef1b Branch: refs/heads/trunk Commit: b8a7ef1b64392094562e7782e0fd092934724ad2 Parents: c723021 Author: Jian He <jia...@apache.org> Authored: Wed Oct 11 21:05:06 2017 -0700 Committer: Jian He <jia...@apache.org> Committed: Mon Nov 6 13:30:18 2017 -0800 ---------------------------------------------------------------------- .../hadoop-yarn-services-core/pom.xml | 5 ++ .../hadoop/yarn/service/ServiceScheduler.java | 94 ++++++++++++++------ .../yarn/service/component/Component.java | 76 +++++++++++++--- .../service/component/ComponentEventType.java | 1 + .../component/instance/ComponentInstance.java | 27 +++++- .../yarn/service/provider/ProviderUtils.java | 15 ++-- .../registry/YarnRegistryViewForProviders.java | 52 ++++++++--- 7 files changed, 211 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml index 205a64d..851f73b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml @@ -159,6 +159,11 @@ <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java index ec5f3ed..f3824df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.registry.client.api.RegistryOperations; import org.apache.hadoop.registry.client.api.RegistryOperationsFactory; -import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.binding.RegistryUtils; import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; @@ -237,11 +237,6 @@ public class ServiceScheduler extends CompositeService { serviceTimelinePublisher .serviceAttemptUnregistered(context, diagnostics.toString()); } - // Cleanup each component instance. no need to release containers as - // they will be automatically released by RM - for (ComponentInstance instance : liveInstances.values()) { - instance.cleanupRegistryAndCompHdfsDir(); - } String msg = diagnostics.toString() + "Navigate to the failed component for more details."; amRMClient @@ -266,11 +261,67 @@ public class ServiceScheduler extends CompositeService { } registerServiceInstance(context.attemptId, app); - //TODO handle containers recover - } + // recover components based on containers sent from RM + recoverComponents(response); - private void recover() { + for (Component component : componentsById.values()) { + // Trigger initial evaluation of components + if (component.areDependenciesReady()) { + LOG.info("Triggering initial evaluation of component {}", + component.getName()); + ComponentEvent event = new ComponentEvent(component.getName(), FLEX) + .setDesired(component.getComponentSpec().getNumberOfContainers()); + component.handle(event); + } + } + } + private void recoverComponents(RegisterApplicationMasterResponse response) { + List<Container> recoveredContainers = response + .getContainersFromPreviousAttempts(); + LOG.info("Received {} containers from previous attempt.", + recoveredContainers.size()); + Map<String, ServiceRecord> existingRecords = new HashMap<>(); + List<String> existingComps = null; + try { + existingComps = yarnRegistryOperations.listComponents(); + LOG.info("Found {} containers from ZK registry: {}", existingComps.size(), + existingComps); + } catch (Exception e) { + LOG.info("Could not read component paths: {}", e.getMessage()); + } + if (existingComps != null) { + for (String existingComp : existingComps) { + try { + ServiceRecord record = + yarnRegistryOperations.getComponent(existingComp); + existingRecords.put(existingComp, record); + } catch (Exception e) { + LOG.warn("Could not resolve record for component {}: {}", + existingComp, e); + } + } + } + for (Container container : recoveredContainers) { + LOG.info("Handling container {} from previous attempt", + container.getId()); + ServiceRecord record = existingRecords.get(RegistryPathUtils + .encodeYarnID(container.getId().toString())); + if (record != null) { + Component comp = componentsById.get(container.getAllocationRequestId()); + ComponentEvent event = + new ComponentEvent(comp.getName(), CONTAINER_RECOVERED) + .setContainer(container) + .setInstance(comp.getComponentInstance(record.description)); + comp.handle(event); + // do not remove requests in this case because we do not know if they + // have already been removed + } else { + LOG.info("Record not found in registry for container {} from previous" + + " attempt, releasing", container.getId()); + amRMClient.releaseAssignedContainer(container.getId()); + } + } } private void initGlobalTokensForSubstitute(ServiceContext context) { @@ -353,7 +404,7 @@ public class ServiceScheduler extends CompositeService { executorService.submit(new Runnable() { @Override public void run() { try { - yarnRegistryOperations.registerSelf(serviceRecord, true); + yarnRegistryOperations.registerSelf(serviceRecord, false); LOG.info("Registered service under {}; absolute path {}", yarnRegistryOperations.getSelfRegistrationPath(), yarnRegistryOperations.getAbsoluteSelfRegistrationPath()); @@ -398,13 +449,6 @@ public class ServiceScheduler extends CompositeService { componentsById.put(allocateId, component); componentsByName.put(component.getName(), component); allocateId++; - - // Trigger the component without dependencies - if (component.areDependenciesReady()) { - ComponentEvent event = new ComponentEvent(compSpec.getName(), FLEX) - .setDesired(compSpec.getNumberOfContainers()); - component.handle(event); - } } } @@ -458,17 +502,17 @@ public class ServiceScheduler extends CompositeService { new ComponentEvent(comp.getName(), CONTAINER_ALLOCATED) .setContainer(container); dispatcher.getEventHandler().handle(event); + Collection<AMRMClient.ContainerRequest> requests = amRMClient + .getMatchingRequests(container.getAllocationRequestId()); LOG.info("[COMPONENT {}]: {} outstanding container requests.", - comp.getName(), - amRMClient.getMatchingRequests(container.getAllocationRequestId()).size()); + comp.getName(), requests.size()); // remove the corresponding request - Collection<AMRMClient.ContainerRequest> collection = amRMClient - .getMatchingRequests(container.getAllocationRequestId()); - if (collection.iterator().hasNext()) { - AMRMClient.ContainerRequest request = collection.iterator().next(); + if (requests.iterator().hasNext()) { + LOG.info("[COMPONENT {}]: removing one container request.", comp + .getName()); + AMRMClient.ContainerRequest request = requests.iterator().next(); amRMClient.removeContainerRequest(request); } - } } @@ -478,7 +522,7 @@ public class ServiceScheduler extends CompositeService { ContainerId containerId = status.getContainerId(); ComponentInstance instance = liveInstances.get(status.getContainerId()); if (instance == null) { - LOG.error( + LOG.warn( "Container {} Completed. No component instance exists. exitStatus={}. diagnostics={} ", containerId, status.getExitStatus(), status.getDiagnostics()); return; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java index cbaf472..98bb238 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.service.ServiceContext; import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent; import org.apache.hadoop.yarn.service.ServiceMetrics; +import org.apache.hadoop.yarn.service.provider.ProviderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; @@ -78,7 +79,7 @@ public class Component implements EventHandler<ComponentEvent> { private ServiceContext context; private AMRMClientAsync<ContainerRequest> amrmClient; private AtomicLong instanceIdCounter = new AtomicLong(); - private Map<ComponentInstanceId, ComponentInstance> compInstances = + private Map<String, ComponentInstance> compInstances = new ConcurrentHashMap<>(); // component instances to be assigned with a container private List<ComponentInstance> pendingInstances = new LinkedList<>(); @@ -101,6 +102,9 @@ public class Component implements EventHandler<ComponentEvent> { // INIT will only got to FLEXING .addTransition(INIT, EnumSet.of(STABLE, FLEXING), FLEX, new FlexComponentTransition()) + // container recovered on AM restart + .addTransition(INIT, INIT, CONTAINER_RECOVERED, + new ContainerRecoveredTransition()) // container allocated by RM .addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED, @@ -165,7 +169,7 @@ public class Component implements EventHandler<ComponentEvent> { new ComponentInstanceId(instanceIdCounter.getAndIncrement(), componentSpec.getName()); ComponentInstance instance = new ComponentInstance(this, id); - compInstances.put(id, instance); + compInstances.put(instance.getCompInstanceName(), instance); pendingInstances.add(instance); } @@ -186,8 +190,8 @@ public class Component implements EventHandler<ComponentEvent> { // This happens on init LOG.info("[INIT COMPONENT " + component.getName() + "]: " + event .getDesired() + " instances."); - component.requestContainers(event.getDesired()); - return FLEXING; + component.requestContainers(component.pendingInstances.size()); + return checkIfStable(component); } long before = component.getComponentSpec().getNumberOfContainers(); long delta = event.getDesired() - before; @@ -205,14 +209,14 @@ public class Component implements EventHandler<ComponentEvent> { LOG.info("[FLEX DOWN COMPONENT " + component.getName() + "]: scaling down from " + before + " to " + event.getDesired()); List<ComponentInstance> list = - new ArrayList<>(component.compInstances.values()); + new ArrayList<>(component.getAllComponentInstances()); // sort in Most recent -> oldest order, destroy most recent ones. Collections.sort(list, Collections.reverseOrder()); for (int i = 0; i < delta; i++) { ComponentInstance instance = list.get(i); // remove the instance - component.compInstances.remove(instance.getCompInstanceId()); + component.compInstances.remove(instance.getCompInstanceName()); component.pendingInstances.remove(instance); component.componentMetrics.containersFailed.incr(); component.componentMetrics.containersRunning.decr(); @@ -236,6 +240,46 @@ public class Component implements EventHandler<ComponentEvent> { } } + private static class ContainerRecoveredTransition extends BaseTransition { + @Override + public void transition(Component component, ComponentEvent event) { + ComponentInstance instance = event.getInstance(); + Container container = event.getContainer(); + if (instance == null) { + LOG.info("[COMPONENT {}]: Trying to recover {} but event did not " + + "specify component instance", + component.getName(), container.getId()); + component.releaseContainer(container); + return; + } + if (instance.hasContainer()) { + LOG.info( + "[COMPONENT {}]: Instance {} already has container, release " + + "surplus container {}", + instance.getCompName(), instance.getCompInstanceId(), container + .getId()); + component.releaseContainer(container); + return; + } + component.pendingInstances.remove(instance); + LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " + + "host {}, num pending component instances reduced to {} ", + component.getName(), container.getId(), instance + .getCompInstanceName(), container.getNodeId(), component + .pendingInstances.size()); + instance.setContainer(container); + ProviderUtils.initCompInstanceDir(component.getContext().fs, instance); + component.getScheduler().addLiveCompInstance(container.getId(), instance); + LOG.info("[COMPONENT {}]: Marking {} as started for component " + + "instance {}", component.getName(), event.getContainer().getId(), + instance.getCompInstanceId()); + component.compInstanceDispatcher.getEventHandler().handle( + new ComponentInstanceEvent(instance.getContainerId(), + START)); + component.incRunningContainers(); + } + } + private static class ContainerStartedTransition implements MultipleArcTransition<Component,ComponentEvent,ComponentState> { @@ -280,14 +324,18 @@ public class Component implements EventHandler<ComponentEvent> { return componentMetrics; } + private void releaseContainer(Container container) { + scheduler.getAmRMClient().releaseAssignedContainer(container.getId()); + componentMetrics.surplusContainers.incr(); + scheduler.getServiceMetrics().surplusContainers.incr(); + } + private void assignContainerToCompInstance(Container container) { if (pendingInstances.size() == 0) { LOG.info( "[COMPONENT {}]: No pending component instance left, release surplus container {}", getName(), container.getId()); - scheduler.getAmRMClient().releaseAssignedContainer(container.getId()); - componentMetrics.surplusContainers.incr(); - scheduler.getServiceMetrics().surplusContainers.incr(); + releaseContainer(container); return; } ComponentInstance instance = pendingInstances.remove(0); @@ -397,7 +445,7 @@ public class Component implements EventHandler<ComponentEvent> { } for (String dependency : dependencies) { Collection<ComponentInstance> instances = scheduler.getAllComponents() - .get(dependency).getAllComponentInstances().values(); + .get(dependency).getAllComponentInstances(); for (ComponentInstance instance : instances) { if (instance.getContainerStatus() == null) { continue; @@ -447,8 +495,12 @@ public class Component implements EventHandler<ComponentEvent> { return componentMetrics.containersDesired.value(); } - public Map<ComponentInstanceId, ComponentInstance> getAllComponentInstances() { - return compInstances; + public ComponentInstance getComponentInstance(String componentInstanceName) { + return compInstances.get(componentInstanceName); + } + + public Collection<ComponentInstance> getAllComponentInstances() { + return compInstances.values(); } public org.apache.hadoop.yarn.service.api.records.Component getComponentSpec() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java index 6729699..067302d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.component; public enum ComponentEventType { FLEX, CONTAINER_ALLOCATED, + CONTAINER_RECOVERED, CONTAINER_STARTED, CONTAINER_COMPLETED } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java index 3c1e48f..68c0537 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.registry.client.binding.RegistryPathUtils; import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; -import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.Container; @@ -35,6 +34,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.service.ServiceScheduler; import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.component.Component; @@ -143,10 +144,19 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, compInstance.getContainerId(), compInstance), 0, 1, TimeUnit.SECONDS); + long containerStartTime = System.currentTimeMillis(); + try { + ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils + .newContainerTokenIdentifier(compInstance.getContainer() + .getContainerToken()); + containerStartTime = containerTokenIdentifier.getCreationTime(); + } catch (Exception e) { + LOG.info("Could not get container creation time, using current time"); + } org.apache.hadoop.yarn.service.api.records.Container container = new org.apache.hadoop.yarn.service.api.records.Container(); container.setId(compInstance.getContainerId().toString()); - container.setLaunchTime(new Date()); + container.setLaunchTime(new Date(containerStartTime)); container.setState(ContainerState.RUNNING_BUT_UNREADY); container.setBareHost(compInstance.container.getNodeId().getHost()); container.setComponentName(compInstance.getCompInstanceName()); @@ -156,7 +166,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, } compInstance.containerSpec = container; compInstance.getCompSpec().addContainer(container); - compInstance.containerStartedTime = System.currentTimeMillis(); + compInstance.containerStartedTime = containerStartTime; if (compInstance.timelineServiceEnabled) { compInstance.serviceTimelinePublisher @@ -243,6 +253,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, } ExitUtil.terminate(-1); } + + compInstance.removeContainer(); } } @@ -276,6 +288,15 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>, } } + public boolean hasContainer() { + return this.container != null; + } + + public void removeContainer() { + this.container = null; + this.compInstanceId.setContainerId(null); + } + public void setContainer(Container container) { this.container = container; this.compInstanceId.setContainerId(container.getId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java index 93abd73..63fbaae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java @@ -209,21 +209,26 @@ public class ProviderUtils implements YarnServiceConstants { } } + public static Path initCompInstanceDir(SliderFileSystem fs, + ComponentInstance instance) { + Path compDir = new Path(new Path(fs.getAppDir(), "components"), + instance.getCompName()); + Path compInstanceDir = new Path(compDir, instance.getCompInstanceName()); + instance.setCompInstanceDir(compInstanceDir); + return compInstanceDir; + } + // 1. Create all config files for a component on hdfs for localization // 2. Add the config file to localResource public static synchronized void createConfigFileAndAddLocalResource( AbstractLauncher launcher, SliderFileSystem fs, Component component, Map<String, String> tokensForSubstitution, ComponentInstance instance, ServiceContext context) throws IOException { - Path compDir = - new Path(new Path(fs.getAppDir(), "components"), component.getName()); - Path compInstanceDir = - new Path(compDir, instance.getCompInstanceName()); + Path compInstanceDir = initCompInstanceDir(fs, instance); if (!fs.getFileSystem().exists(compInstanceDir)) { log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir); fs.getFileSystem().mkdirs(compInstanceDir, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); - instance.setCompInstanceDir(compInstanceDir); } else { log.info("Component instance conf dir already exists: " + compInstanceDir); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java index 62d7a6a..d418b59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java @@ -48,7 +48,7 @@ public class YarnRegistryViewForProviders { private final RegistryOperations registryOperations; private final String user; - private final String sliderServiceClass; + private final String serviceClass; private final String instanceName; /** * Record used where the service registered itself. @@ -57,20 +57,20 @@ public class YarnRegistryViewForProviders { private ServiceRecord selfRegistration; /** - * Path where record was registered + * Path where record was registered. * Null until the service is registered */ private String selfRegistrationPath; public YarnRegistryViewForProviders(RegistryOperations registryOperations, String user, - String sliderServiceClass, + String serviceClass, String instanceName, ApplicationAttemptId applicationAttemptId) { Preconditions.checkArgument(registryOperations != null, "null registry operations"); Preconditions.checkArgument(user != null, "null user"); - Preconditions.checkArgument(SliderUtils.isSet(sliderServiceClass), + Preconditions.checkArgument(SliderUtils.isSet(serviceClass), "unset service class"); Preconditions.checkArgument(SliderUtils.isSet(instanceName), "instanceName"); @@ -78,7 +78,7 @@ public class YarnRegistryViewForProviders { "null applicationAttemptId"); this.registryOperations = registryOperations; this.user = user; - this.sliderServiceClass = sliderServiceClass; + this.serviceClass = serviceClass; this.instanceName = instanceName; } @@ -117,7 +117,7 @@ public class YarnRegistryViewForProviders { } /** - * Add a component under the slider name/entry + * Add a component under the slider name/entry. * @param componentName component name * @param record record to put * @throws IOException @@ -125,13 +125,13 @@ public class YarnRegistryViewForProviders { public void putComponent(String componentName, ServiceRecord record) throws IOException { - putComponent(sliderServiceClass, instanceName, + putComponent(serviceClass, instanceName, componentName, record); } /** - * Add a component + * Add a component. * @param serviceClass service class to use under ~user * @param componentName component name * @param record record to put @@ -146,9 +146,33 @@ public class YarnRegistryViewForProviders { registryOperations.mknode(RegistryPathUtils.parentOf(path), true); registryOperations.bind(path, record, BindFlags.OVERWRITE); } - + + /** + * Get a component. + * @param componentName component name + * @return the service record + * @throws IOException + */ + public ServiceRecord getComponent(String componentName) throws IOException { + String path = RegistryUtils.componentPath( + user, serviceClass, instanceName, componentName); + LOG.info("Resolving path {}", path); + return registryOperations.resolve(path); + } + + /** + * List components. + * @return a list of components + * @throws IOException + */ + public List<String> listComponents() throws IOException { + String path = RegistryUtils.componentListPath( + user, serviceClass, instanceName); + return registryOperations.list(path); + } + /** - * Add a service under a path, optionally purging any history + * Add a service under a path, optionally purging any history. * @param username user * @param serviceClass service class to use under ~user * @param serviceName name of the service @@ -173,7 +197,7 @@ public class YarnRegistryViewForProviders { } /** - * Add a service under a path for the current user + * Add a service under a path for the current user. * @param record service record * @param deleteTreeFirst perform recursive delete of the path first * @return the path the service was created at @@ -183,20 +207,20 @@ public class YarnRegistryViewForProviders { ServiceRecord record, boolean deleteTreeFirst) throws IOException { selfRegistrationPath = - putService(user, sliderServiceClass, instanceName, record, deleteTreeFirst); + putService(user, serviceClass, instanceName, record, deleteTreeFirst); setSelfRegistration(record); return selfRegistrationPath; } /** - * Delete a component + * Delete a component. * @param containerId component name * @throws IOException */ public void deleteComponent(ComponentInstanceId instanceId, String containerId) throws IOException { String path = RegistryUtils.componentPath( - user, sliderServiceClass, instanceName, + user, serviceClass, instanceName, containerId); LOG.info(instanceId + ": Deleting registry path " + path); registryOperations.delete(path, false); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org