YARN-6744. Recover component information on YARN native services AM restart. 
Contributed by Billie Rinaldi


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b8a7ef1b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b8a7ef1b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b8a7ef1b

Branch: refs/heads/trunk
Commit: b8a7ef1b64392094562e7782e0fd092934724ad2
Parents: c723021
Author: Jian He <jia...@apache.org>
Authored: Wed Oct 11 21:05:06 2017 -0700
Committer: Jian He <jia...@apache.org>
Committed: Mon Nov 6 13:30:18 2017 -0800

----------------------------------------------------------------------
 .../hadoop-yarn-services-core/pom.xml           |  5 ++
 .../hadoop/yarn/service/ServiceScheduler.java   | 94 ++++++++++++++------
 .../yarn/service/component/Component.java       | 76 +++++++++++++---
 .../service/component/ComponentEventType.java   |  1 +
 .../component/instance/ComponentInstance.java   | 27 +++++-
 .../yarn/service/provider/ProviderUtils.java    | 15 ++--
 .../registry/YarnRegistryViewForProviders.java  | 52 ++++++++---
 7 files changed, 211 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml
index 205a64d..851f73b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml
@@ -159,6 +159,11 @@
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
     </dependency>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index ec5f3ed..f3824df 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
-import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.registry.client.types.ServiceRecord;
 import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
@@ -237,11 +237,6 @@ public class ServiceScheduler extends CompositeService {
       serviceTimelinePublisher
           .serviceAttemptUnregistered(context, diagnostics.toString());
     }
-    // Cleanup each component instance. no need to release containers as
-    // they will be automatically released by RM
-    for (ComponentInstance instance : liveInstances.values()) {
-      instance.cleanupRegistryAndCompHdfsDir();
-    }
     String msg = diagnostics.toString()
         + "Navigate to the failed component for more details.";
     amRMClient
@@ -266,11 +261,67 @@ public class ServiceScheduler extends CompositeService {
     }
     registerServiceInstance(context.attemptId, app);
 
-    //TODO handle containers recover
-  }
+    // recover components based on containers sent from RM
+    recoverComponents(response);
 
-  private void recover() {
+    for (Component component : componentsById.values()) {
+      // Trigger initial evaluation of components
+      if (component.areDependenciesReady()) {
+        LOG.info("Triggering initial evaluation of component {}",
+            component.getName());
+        ComponentEvent event = new ComponentEvent(component.getName(), FLEX)
+            .setDesired(component.getComponentSpec().getNumberOfContainers());
+        component.handle(event);
+      }
+    }
+  }
 
+  private void recoverComponents(RegisterApplicationMasterResponse response) {
+    List<Container> recoveredContainers = response
+        .getContainersFromPreviousAttempts();
+    LOG.info("Received {} containers from previous attempt.",
+        recoveredContainers.size());
+    Map<String, ServiceRecord> existingRecords = new HashMap<>();
+    List<String> existingComps = null;
+    try {
+      existingComps = yarnRegistryOperations.listComponents();
+      LOG.info("Found {} containers from ZK registry: {}", 
existingComps.size(),
+          existingComps);
+    } catch (Exception e) {
+      LOG.info("Could not read component paths: {}", e.getMessage());
+    }
+    if (existingComps != null) {
+      for (String existingComp : existingComps) {
+        try {
+          ServiceRecord record =
+              yarnRegistryOperations.getComponent(existingComp);
+          existingRecords.put(existingComp, record);
+        } catch (Exception e) {
+          LOG.warn("Could not resolve record for component {}: {}",
+              existingComp, e);
+        }
+      }
+    }
+    for (Container container : recoveredContainers) {
+      LOG.info("Handling container {} from previous attempt",
+          container.getId());
+      ServiceRecord record = existingRecords.get(RegistryPathUtils
+          .encodeYarnID(container.getId().toString()));
+      if (record != null) {
+        Component comp = 
componentsById.get(container.getAllocationRequestId());
+        ComponentEvent event =
+            new ComponentEvent(comp.getName(), CONTAINER_RECOVERED)
+                .setContainer(container)
+                .setInstance(comp.getComponentInstance(record.description));
+        comp.handle(event);
+        // do not remove requests in this case because we do not know if they
+        // have already been removed
+      } else {
+        LOG.info("Record not found in registry for container {} from previous" 
+
+            " attempt, releasing", container.getId());
+        amRMClient.releaseAssignedContainer(container.getId());
+      }
+    }
   }
 
   private void initGlobalTokensForSubstitute(ServiceContext context) {
@@ -353,7 +404,7 @@ public class ServiceScheduler extends CompositeService {
     executorService.submit(new Runnable() {
       @Override public void run() {
         try {
-          yarnRegistryOperations.registerSelf(serviceRecord, true);
+          yarnRegistryOperations.registerSelf(serviceRecord, false);
           LOG.info("Registered service under {}; absolute path {}",
               yarnRegistryOperations.getSelfRegistrationPath(),
               yarnRegistryOperations.getAbsoluteSelfRegistrationPath());
@@ -398,13 +449,6 @@ public class ServiceScheduler extends CompositeService {
       componentsById.put(allocateId, component);
       componentsByName.put(component.getName(), component);
       allocateId++;
-
-      // Trigger the component without dependencies
-      if (component.areDependenciesReady()) {
-        ComponentEvent event = new ComponentEvent(compSpec.getName(), FLEX)
-            .setDesired(compSpec.getNumberOfContainers());
-        component.handle(event);
-      }
     }
   }
 
@@ -458,17 +502,17 @@ public class ServiceScheduler extends CompositeService {
             new ComponentEvent(comp.getName(), CONTAINER_ALLOCATED)
                 .setContainer(container);
         dispatcher.getEventHandler().handle(event);
+        Collection<AMRMClient.ContainerRequest> requests = amRMClient
+            .getMatchingRequests(container.getAllocationRequestId());
         LOG.info("[COMPONENT {}]: {} outstanding container requests.",
-            comp.getName(),
-            
amRMClient.getMatchingRequests(container.getAllocationRequestId()).size());
+            comp.getName(), requests.size());
         // remove the corresponding request
-        Collection<AMRMClient.ContainerRequest> collection = amRMClient
-            .getMatchingRequests(container.getAllocationRequestId());
-        if (collection.iterator().hasNext()) {
-          AMRMClient.ContainerRequest request = collection.iterator().next();
+        if (requests.iterator().hasNext()) {
+          LOG.info("[COMPONENT {}]: removing one container request.", comp
+              .getName());
+          AMRMClient.ContainerRequest request = requests.iterator().next();
           amRMClient.removeContainerRequest(request);
         }
-
       }
     }
 
@@ -478,7 +522,7 @@ public class ServiceScheduler extends CompositeService {
         ContainerId containerId = status.getContainerId();
         ComponentInstance instance = 
liveInstances.get(status.getContainerId());
         if (instance == null) {
-          LOG.error(
+          LOG.warn(
               "Container {} Completed. No component instance exists. 
exitStatus={}. diagnostics={} ",
               containerId, status.getExitStatus(), status.getDiagnostics());
           return;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index cbaf472..98bb238 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.service.ServiceContext;
 import org.apache.hadoop.yarn.service.ServiceScheduler;
 import 
org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
 import org.apache.hadoop.yarn.service.ServiceMetrics;
+import org.apache.hadoop.yarn.service.provider.ProviderUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
@@ -78,7 +79,7 @@ public class Component implements 
EventHandler<ComponentEvent> {
   private ServiceContext context;
   private AMRMClientAsync<ContainerRequest> amrmClient;
   private AtomicLong instanceIdCounter = new AtomicLong();
-  private Map<ComponentInstanceId, ComponentInstance> compInstances =
+  private Map<String, ComponentInstance> compInstances =
       new ConcurrentHashMap<>();
   // component instances to be assigned with a container
   private List<ComponentInstance> pendingInstances = new LinkedList<>();
@@ -101,6 +102,9 @@ public class Component implements 
EventHandler<ComponentEvent> {
            // INIT will only got to FLEXING
           .addTransition(INIT, EnumSet.of(STABLE, FLEXING),
               FLEX, new FlexComponentTransition())
+          // container recovered on AM restart
+          .addTransition(INIT, INIT, CONTAINER_RECOVERED,
+              new ContainerRecoveredTransition())
 
           // container allocated by RM
           .addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED,
@@ -165,7 +169,7 @@ public class Component implements 
EventHandler<ComponentEvent> {
         new ComponentInstanceId(instanceIdCounter.getAndIncrement(),
             componentSpec.getName());
     ComponentInstance instance = new ComponentInstance(this, id);
-    compInstances.put(id, instance);
+    compInstances.put(instance.getCompInstanceName(), instance);
     pendingInstances.add(instance);
   }
 
@@ -186,8 +190,8 @@ public class Component implements 
EventHandler<ComponentEvent> {
         // This happens on init
         LOG.info("[INIT COMPONENT " + component.getName() + "]: " + event
             .getDesired() + " instances.");
-        component.requestContainers(event.getDesired());
-        return FLEXING;
+        component.requestContainers(component.pendingInstances.size());
+        return checkIfStable(component);
       }
       long before = component.getComponentSpec().getNumberOfContainers();
       long delta = event.getDesired() - before;
@@ -205,14 +209,14 @@ public class Component implements 
EventHandler<ComponentEvent> {
         LOG.info("[FLEX DOWN COMPONENT " + component.getName()
             + "]: scaling down from " + before + " to " + event.getDesired());
         List<ComponentInstance> list =
-            new ArrayList<>(component.compInstances.values());
+            new ArrayList<>(component.getAllComponentInstances());
 
         // sort in Most recent -> oldest order, destroy most recent ones.
         Collections.sort(list, Collections.reverseOrder());
         for (int i = 0; i < delta; i++) {
           ComponentInstance instance = list.get(i);
           // remove the instance
-          component.compInstances.remove(instance.getCompInstanceId());
+          component.compInstances.remove(instance.getCompInstanceName());
           component.pendingInstances.remove(instance);
           component.componentMetrics.containersFailed.incr();
           component.componentMetrics.containersRunning.decr();
@@ -236,6 +240,46 @@ public class Component implements 
EventHandler<ComponentEvent> {
     }
   }
 
+  private static class ContainerRecoveredTransition extends BaseTransition {
+    @Override
+    public void transition(Component component, ComponentEvent event) {
+      ComponentInstance instance = event.getInstance();
+      Container container = event.getContainer();
+      if (instance == null) {
+        LOG.info("[COMPONENT {}]: Trying to recover {} but event did not " +
+                "specify component instance",
+            component.getName(), container.getId());
+        component.releaseContainer(container);
+        return;
+      }
+      if (instance.hasContainer()) {
+        LOG.info(
+            "[COMPONENT {}]: Instance {} already has container, release " +
+                "surplus container {}",
+            instance.getCompName(), instance.getCompInstanceId(), container
+                .getId());
+        component.releaseContainer(container);
+        return;
+      }
+      component.pendingInstances.remove(instance);
+      LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " +
+              "host {}, num pending component instances reduced to {} ",
+          component.getName(), container.getId(), instance
+              .getCompInstanceName(), container.getNodeId(), component
+              .pendingInstances.size());
+      instance.setContainer(container);
+      ProviderUtils.initCompInstanceDir(component.getContext().fs, instance);
+      component.getScheduler().addLiveCompInstance(container.getId(), 
instance);
+      LOG.info("[COMPONENT {}]: Marking {} as started for component " +
+          "instance {}", component.getName(), event.getContainer().getId(),
+          instance.getCompInstanceId());
+      component.compInstanceDispatcher.getEventHandler().handle(
+          new ComponentInstanceEvent(instance.getContainerId(),
+              START));
+      component.incRunningContainers();
+    }
+  }
+
   private static class ContainerStartedTransition implements
       MultipleArcTransition<Component,ComponentEvent,ComponentState> {
 
@@ -280,14 +324,18 @@ public class Component implements 
EventHandler<ComponentEvent> {
     return componentMetrics;
   }
 
+  private void releaseContainer(Container container) {
+    scheduler.getAmRMClient().releaseAssignedContainer(container.getId());
+    componentMetrics.surplusContainers.incr();
+    scheduler.getServiceMetrics().surplusContainers.incr();
+  }
+
   private void assignContainerToCompInstance(Container container) {
     if (pendingInstances.size() == 0) {
       LOG.info(
           "[COMPONENT {}]: No pending component instance left, release surplus 
container {}",
           getName(), container.getId());
-      scheduler.getAmRMClient().releaseAssignedContainer(container.getId());
-      componentMetrics.surplusContainers.incr();
-      scheduler.getServiceMetrics().surplusContainers.incr();
+      releaseContainer(container);
       return;
     }
     ComponentInstance instance = pendingInstances.remove(0);
@@ -397,7 +445,7 @@ public class Component implements 
EventHandler<ComponentEvent> {
     }
     for (String dependency : dependencies) {
       Collection<ComponentInstance> instances = scheduler.getAllComponents()
-          .get(dependency).getAllComponentInstances().values();
+          .get(dependency).getAllComponentInstances();
       for (ComponentInstance instance : instances) {
         if (instance.getContainerStatus() == null) {
           continue;
@@ -447,8 +495,12 @@ public class Component implements 
EventHandler<ComponentEvent> {
     return componentMetrics.containersDesired.value();
   }
 
-  public Map<ComponentInstanceId, ComponentInstance> 
getAllComponentInstances() {
-    return compInstances;
+  public ComponentInstance getComponentInstance(String componentInstanceName) {
+    return compInstances.get(componentInstanceName);
+  }
+
+  public Collection<ComponentInstance> getAllComponentInstances() {
+    return compInstances.values();
   }
 
   public org.apache.hadoop.yarn.service.api.records.Component 
getComponentSpec() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
index 6729699..067302d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/ComponentEventType.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.component;
 public enum ComponentEventType {
   FLEX,
   CONTAINER_ALLOCATED,
+  CONTAINER_RECOVERED,
   CONTAINER_STARTED,
   CONTAINER_COMPLETED
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
index 3c1e48f..68c0537 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/instance/ComponentInstance.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
 import org.apache.hadoop.registry.client.types.ServiceRecord;
 import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
-import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -35,6 +34,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.service.ServiceScheduler;
 import org.apache.hadoop.yarn.service.api.records.ContainerState;
 import org.apache.hadoop.yarn.service.component.Component;
@@ -143,10 +144,19 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
                   compInstance.getContainerId(), compInstance), 0, 1,
               TimeUnit.SECONDS);
 
+      long containerStartTime = System.currentTimeMillis();
+      try {
+        ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+            .newContainerTokenIdentifier(compInstance.getContainer()
+                .getContainerToken());
+        containerStartTime = containerTokenIdentifier.getCreationTime();
+      } catch (Exception e) {
+        LOG.info("Could not get container creation time, using current time");
+      }
       org.apache.hadoop.yarn.service.api.records.Container container =
           new org.apache.hadoop.yarn.service.api.records.Container();
       container.setId(compInstance.getContainerId().toString());
-      container.setLaunchTime(new Date());
+      container.setLaunchTime(new Date(containerStartTime));
       container.setState(ContainerState.RUNNING_BUT_UNREADY);
       container.setBareHost(compInstance.container.getNodeId().getHost());
       container.setComponentName(compInstance.getCompInstanceName());
@@ -156,7 +166,7 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
       }
       compInstance.containerSpec = container;
       compInstance.getCompSpec().addContainer(container);
-      compInstance.containerStartedTime = System.currentTimeMillis();
+      compInstance.containerStartedTime = containerStartTime;
 
       if (compInstance.timelineServiceEnabled) {
         compInstance.serviceTimelinePublisher
@@ -243,6 +253,8 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
         }
         ExitUtil.terminate(-1);
       }
+
+      compInstance.removeContainer();
     }
   }
 
@@ -276,6 +288,15 @@ public class ComponentInstance implements 
EventHandler<ComponentInstanceEvent>,
     }
   }
 
+  public boolean hasContainer() {
+    return this.container != null;
+  }
+
+  public void removeContainer() {
+    this.container = null;
+    this.compInstanceId.setContainerId(null);
+  }
+
   public void setContainer(Container container) {
     this.container = container;
     this.compInstanceId.setContainerId(container.getId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
index 93abd73..63fbaae 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
@@ -209,21 +209,26 @@ public class ProviderUtils implements 
YarnServiceConstants {
     }
   }
 
+  public static Path initCompInstanceDir(SliderFileSystem fs,
+      ComponentInstance instance) {
+    Path compDir = new Path(new Path(fs.getAppDir(), "components"),
+        instance.getCompName());
+    Path compInstanceDir = new Path(compDir, instance.getCompInstanceName());
+    instance.setCompInstanceDir(compInstanceDir);
+    return compInstanceDir;
+  }
+
   // 1. Create all config files for a component on hdfs for localization
   // 2. Add the config file to localResource
   public static synchronized void createConfigFileAndAddLocalResource(
       AbstractLauncher launcher, SliderFileSystem fs, Component component,
       Map<String, String> tokensForSubstitution, ComponentInstance instance,
       ServiceContext context) throws IOException {
-    Path compDir =
-        new Path(new Path(fs.getAppDir(), "components"), component.getName());
-    Path compInstanceDir =
-        new Path(compDir, instance.getCompInstanceName());
+    Path compInstanceDir = initCompInstanceDir(fs, instance);
     if (!fs.getFileSystem().exists(compInstanceDir)) {
       log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + 
compInstanceDir);
       fs.getFileSystem().mkdirs(compInstanceDir,
           new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
-      instance.setCompInstanceDir(compInstanceDir);
     } else {
       log.info("Component instance conf dir already exists: " + 
compInstanceDir);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8a7ef1b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java
index 62d7a6a..d418b59 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/registry/YarnRegistryViewForProviders.java
@@ -48,7 +48,7 @@ public class YarnRegistryViewForProviders {
 
   private final RegistryOperations registryOperations;
   private final String user;
-  private final String sliderServiceClass;
+  private final String serviceClass;
   private final String instanceName;
   /**
    * Record used where the service registered itself.
@@ -57,20 +57,20 @@ public class YarnRegistryViewForProviders {
   private ServiceRecord selfRegistration;
 
   /**
-   * Path where record was registered
+   * Path where record was registered.
    * Null until the service is registered
    */
   private String selfRegistrationPath;
 
   public YarnRegistryViewForProviders(RegistryOperations registryOperations,
       String user,
-      String sliderServiceClass,
+      String serviceClass,
       String instanceName,
       ApplicationAttemptId applicationAttemptId) {
     Preconditions.checkArgument(registryOperations != null,
         "null registry operations");
     Preconditions.checkArgument(user != null, "null user");
-    Preconditions.checkArgument(SliderUtils.isSet(sliderServiceClass),
+    Preconditions.checkArgument(SliderUtils.isSet(serviceClass),
         "unset service class");
     Preconditions.checkArgument(SliderUtils.isSet(instanceName),
         "instanceName");
@@ -78,7 +78,7 @@ public class YarnRegistryViewForProviders {
         "null applicationAttemptId");
     this.registryOperations = registryOperations;
     this.user = user;
-    this.sliderServiceClass = sliderServiceClass;
+    this.serviceClass = serviceClass;
     this.instanceName = instanceName;
   }
 
@@ -117,7 +117,7 @@ public class YarnRegistryViewForProviders {
   }
 
   /**
-   * Add a component under the slider name/entry
+   * Add a component under the slider name/entry.
    * @param componentName component name
    * @param record record to put
    * @throws IOException
@@ -125,13 +125,13 @@ public class YarnRegistryViewForProviders {
   public void putComponent(String componentName,
       ServiceRecord record) throws
       IOException {
-    putComponent(sliderServiceClass, instanceName,
+    putComponent(serviceClass, instanceName,
         componentName,
         record);
   }
 
   /**
-   * Add a component 
+   * Add a component.
    * @param serviceClass service class to use under ~user
    * @param componentName component name
    * @param record record to put
@@ -146,9 +146,33 @@ public class YarnRegistryViewForProviders {
     registryOperations.mknode(RegistryPathUtils.parentOf(path), true);
     registryOperations.bind(path, record, BindFlags.OVERWRITE);
   }
-    
+
+  /**
+   * Get a component.
+   * @param componentName component name
+   * @return the service record
+   * @throws IOException
+   */
+  public ServiceRecord getComponent(String componentName) throws IOException {
+    String path = RegistryUtils.componentPath(
+        user, serviceClass, instanceName, componentName);
+    LOG.info("Resolving path {}", path);
+    return registryOperations.resolve(path);
+  }
+
+  /**
+   * List components.
+   * @return a list of components
+   * @throws IOException
+   */
+  public List<String> listComponents() throws IOException {
+    String path = RegistryUtils.componentListPath(
+        user, serviceClass, instanceName);
+    return registryOperations.list(path);
+  }
+
   /**
-   * Add a service under a path, optionally purging any history
+   * Add a service under a path, optionally purging any history.
    * @param username user
    * @param serviceClass service class to use under ~user
    * @param serviceName name of the service
@@ -173,7 +197,7 @@ public class YarnRegistryViewForProviders {
   }
 
   /**
-   * Add a service under a path for the current user
+   * Add a service under a path for the current user.
    * @param record service record
    * @param deleteTreeFirst perform recursive delete of the path first
    * @return the path the service was created at
@@ -183,20 +207,20 @@ public class YarnRegistryViewForProviders {
       ServiceRecord record,
       boolean deleteTreeFirst) throws IOException {
     selfRegistrationPath =
-        putService(user, sliderServiceClass, instanceName, record, 
deleteTreeFirst);
+        putService(user, serviceClass, instanceName, record, deleteTreeFirst);
     setSelfRegistration(record);
     return selfRegistrationPath;
   }
 
   /**
-   * Delete a component
+   * Delete a component.
    * @param containerId component name
    * @throws IOException
    */
   public void deleteComponent(ComponentInstanceId instanceId,
       String containerId) throws IOException {
     String path = RegistryUtils.componentPath(
-        user, sliderServiceClass, instanceName,
+        user, serviceClass, instanceName,
         containerId);
     LOG.info(instanceId + ": Deleting registry path " + path);
     registryOperations.delete(path, false);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to