Repository: hadoop
Updated Branches:
  refs/heads/trunk 00905efab -> 00ebec89f


YARN-8048. Support auto-spawning of admin configured services during bootstrap 
of RM (Rohith Sharma K S via wangda)

Change-Id: I2d8d61ccad55e1118009294d7e17822df3cd0fd5


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d4e63ccc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d4e63ccc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d4e63ccc

Branch: refs/heads/trunk
Commit: d4e63ccca0763b452e4a0169dd932b3f32066281
Parents: 00905ef
Author: Wangda Tan <wan...@apache.org>
Authored: Fri Apr 6 21:24:58 2018 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Fri Apr 6 21:24:58 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   4 +
 .../hadoop-yarn-services-api/pom.xml            |   5 +
 .../client/SystemServiceManagerImpl.java        | 381 +++++++++++++++++++
 .../service/client/TestSystemServiceImpl.java   | 180 +++++++++
 .../users/sync/user1/example-app1.yarnfile      |  16 +
 .../users/sync/user1/example-app2.yarnfile      |  16 +
 .../users/sync/user1/example-app3.json          |  16 +
 .../users/sync/user2/example-app1.yarnfile      |  16 +
 .../users/sync/user2/example-app2.yarnfile      |  16 +
 .../yarn/service/conf/YarnServiceConf.java      |   2 +
 .../yarn/service/TestSystemServiceManager.java  | 156 ++++++++
 .../server/service/SystemServiceManager.java    |  25 ++
 .../yarn/server/service/package-info.java       |  27 ++
 .../server/resourcemanager/ResourceManager.java |  30 +-
 14 files changed, 889 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 41755e2..7a2a3ce 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -343,6 +343,10 @@ public class YarnConfiguration extends Configuration {
   public static final String YARN_API_SERVICES_ENABLE = "yarn."
       + "webapp.api-service.enable";
 
+  @Private
+  public static final String DEFAULT_YARN_API_SYSTEM_SERVICES_CLASS =
+      "org.apache.hadoop.yarn.service.client.SystemServiceManagerImpl";
+
   public static final String RM_RESOURCE_TRACKER_ADDRESS =
     RM_PREFIX + "resource-tracker.address";
   public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8031;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml
index 7fe2ef6..354c9b5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/pom.xml
@@ -71,6 +71,7 @@
         <configuration>
           <excludes>
             <exclude>**/*.json</exclude>
+            <exclude>**/*.yarnfile</exclude>
           </excludes>
         </configuration>
       </plugin>
@@ -96,6 +97,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java
new file mode 100644
index 0000000..225f8bd
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/SystemServiceManagerImpl.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.service.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.service.SystemServiceManager;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
+
+/**
+ * SystemServiceManager implementation.
+ * Scan for configure system service path.
+ *
+ * The service path structure is as follows:
+ * SYSTEM_SERVICE_DIR_PATH
+ * |---- sync
+ * |     |--- user1
+ * |     |    |---- service1.yarnfile
+ * |     |    |---- service2.yarnfile
+ * |     |--- user2
+ * |     |    |---- service1.yarnfile
+ * |     |    ....
+ * |     |
+ * |---- async
+ * |     |--- user3
+ * |     |    |---- service1.yarnfile
+ * |     |    |---- service2.yarnfile
+ * |     |--- user4
+ * |     |    |---- service1.yarnfile
+ * |     |    ....
+ * |     |
+ *
+ * sync: These services are launched at the time of service start 
synchronously.
+ *       It is a blocking service start.
+ * async: These services are launched in separate thread without any delay 
after
+ *       service start. Non-blocking service start.
+ */
+public class SystemServiceManagerImpl extends AbstractService
+    implements SystemServiceManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SystemServiceManagerImpl.class);
+
+  private static final String YARN_FILE_SUFFIX = ".yarnfile";
+  private static final String SYNC = "sync";
+  private static final String ASYNC = "async";
+
+  private FileSystem fs;
+  private Path systemServiceDir;
+  private AtomicBoolean stopExecutors = new AtomicBoolean(false);
+  private Map<String, Set<Service>> syncUserServices = new HashMap<>();
+  private Map<String, Set<Service>> asyncUserServices = new HashMap<>();
+  private UserGroupInformation loginUGI;
+  private Thread serviceLaucher;
+
+  @VisibleForTesting
+  private int skipCounter;
+  @VisibleForTesting
+  private Map<String, Integer> ignoredUserServices =
+      new HashMap<>();
+
+  public SystemServiceManagerImpl() {
+    super(SystemServiceManagerImpl.class.getName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    String dirPath =
+        conf.get(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY);
+    if (dirPath != null) {
+      systemServiceDir = new Path(dirPath);
+      LOG.info("System Service Directory is configured to {}",
+          systemServiceDir);
+      fs = systemServiceDir.getFileSystem(conf);
+      this.loginUGI = UserGroupInformation.isSecurityEnabled() ?
+          UserGroupInformation.getLoginUser() :
+          UserGroupInformation.getCurrentUser();
+      LOG.info("UserGroupInformation initialized to {}", loginUGI);
+    }
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    scanForUserServices();
+    launchUserService(syncUserServices);
+    // Create a thread and submit services in background otherwise it
+    // block RM switch time.
+    serviceLaucher = new Thread(createRunnable());
+    serviceLaucher.setName("System service launcher");
+    serviceLaucher.start();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Stopping {}", getName());
+    stopExecutors.set(true);
+
+    if (serviceLaucher != null) {
+      serviceLaucher.interrupt();
+      try {
+        serviceLaucher.join();
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted Exception while stopping", ie);
+      }
+    }
+  }
+
+  private Runnable createRunnable() {
+    return new Runnable() {
+      @Override
+      public void run() {
+        launchUserService(asyncUserServices);
+      }
+    };
+  }
+
+  void launchUserService(Map<String, Set<Service>> userServices) {
+    for (Map.Entry<String, Set<Service>> entry : userServices.entrySet()) {
+      String user = entry.getKey();
+      Set<Service> services = entry.getValue();
+      if (services.isEmpty()) {
+        continue;
+      }
+      ServiceClient serviceClient = null;
+      try {
+        UserGroupInformation userUgi = getProxyUser(user);
+        serviceClient = createServiceClient(userUgi);
+        for (Service service : services) {
+          LOG.info("POST: createService = {} user = {}", service, userUgi);
+          try {
+            launchServices(userUgi, serviceClient, service);
+          } catch (IOException | UndeclaredThrowableException e) {
+            if (e.getCause() != null) {
+              LOG.warn(e.getCause().getMessage());
+            } else {
+              String message =
+                  "Failed to create service " + service.getName() + " : ";
+              LOG.error(message, e);
+            }
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("System service launcher thread interrupted", e);
+        break;
+      } catch (Exception e) {
+        LOG.error("Error while submitting services for user " + user, e);
+      } finally {
+        if (serviceClient != null) {
+          try {
+            serviceClient.close();
+          } catch (IOException e) {
+            LOG.warn("Error while closing serviceClient for user {}", user);
+          }
+        }
+      }
+    }
+  }
+
+  private ServiceClient createServiceClient(UserGroupInformation userUgi)
+      throws IOException, InterruptedException {
+    ServiceClient serviceClient =
+        userUgi.doAs(new PrivilegedExceptionAction<ServiceClient>() {
+          @Override public ServiceClient run()
+              throws IOException, YarnException {
+            ServiceClient sc = getServiceClient();
+            sc.init(getConfig());
+            sc.start();
+            return sc;
+          }
+        });
+    return serviceClient;
+  }
+
+  private void launchServices(UserGroupInformation userUgi,
+      ServiceClient serviceClient, Service service)
+      throws IOException, InterruptedException {
+    if (service.getState() == ServiceState.STOPPED) {
+      userUgi.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override public Void run() throws IOException, YarnException {
+          serviceClient.actionBuild(service);
+          return null;
+        }
+      });
+      LOG.info("Service {} version {} saved.", service.getName(),
+          service.getVersion());
+    } else {
+      ApplicationId applicationId =
+          userUgi.doAs(new PrivilegedExceptionAction<ApplicationId>() {
+            @Override public ApplicationId run()
+                throws IOException, YarnException {
+              ApplicationId applicationId = 
serviceClient.actionCreate(service);
+              return applicationId;
+            }
+          });
+      LOG.info("Service {} submitted with Application ID: {}",
+          service.getName(), applicationId);
+    }
+  }
+
+  ServiceClient getServiceClient() {
+    return new ServiceClient();
+  }
+
+  private UserGroupInformation getProxyUser(String user) {
+    UserGroupInformation ugi;
+    if (UserGroupInformation.isSecurityEnabled()) {
+      ugi = UserGroupInformation.createProxyUser(user, loginUGI);
+    } else {
+      ugi = UserGroupInformation.createRemoteUser(user);
+    }
+    return ugi;
+  }
+
+  // scan for both launch service types i.e sync and async
+  void scanForUserServices() throws IOException {
+    if (systemServiceDir == null) {
+      return;
+    }
+    try {
+      LOG.info("Scan for launch type on {}", systemServiceDir);
+      RemoteIterator<FileStatus> iterLaunchType = list(systemServiceDir);
+      while (iterLaunchType.hasNext()) {
+        FileStatus launchType = iterLaunchType.next();
+        if (!launchType.isDirectory()) {
+          LOG.debug("Scanner skips for unknown file {}", launchType.getPath());
+          continue;
+        }
+        if (launchType.getPath().getName().equals(SYNC)) {
+          scanForUserServiceDefinition(launchType.getPath(), syncUserServices);
+        } else if (launchType.getPath().getName().equals(ASYNC)) {
+          scanForUserServiceDefinition(launchType.getPath(), 
asyncUserServices);
+        } else {
+          LOG.debug("Scanner skips for unknown dir {}.", launchType.getPath());
+        }
+      }
+    } catch (FileNotFoundException e) {
+      LOG.warn("System service directory {} doesn't not exist.",
+          systemServiceDir);
+    }
+  }
+
+  // Files are under systemServiceDir/<users>. Scan for 2 levels
+  // 1st level for users
+  // 2nd level for service definitions under user
+  private void scanForUserServiceDefinition(Path userDirPath,
+      Map<String, Set<Service>> userServices) throws IOException {
+    LOG.info("Scan for users on {}", userDirPath);
+    RemoteIterator<FileStatus> iterUsers = list(userDirPath);
+    while (iterUsers.hasNext()) {
+      FileStatus userDir = iterUsers.next();
+      // if 1st level is not user directory then skip it.
+      if (!userDir.isDirectory()) {
+        LOG.info(
+            "Service definition {} doesn't belong to any user. Ignoring.. ",
+            userDir.getPath().getName());
+        continue;
+      }
+      String userName = userDir.getPath().getName();
+      LOG.info("Scanning service definitions for user {}.", userName);
+
+      //2nd level scan
+      RemoteIterator<FileStatus> iterServices = list(userDir.getPath());
+      while (iterServices.hasNext()) {
+        FileStatus serviceCache = iterServices.next();
+        String filename = serviceCache.getPath().getName();
+        if (!serviceCache.isFile()) {
+          LOG.info("Scanner skips for unknown dir {}", filename);
+          continue;
+        }
+        if (!filename.endsWith(YARN_FILE_SUFFIX)) {
+          LOG.info("Scanner skips for unknown file extension, filename = {}",
+              filename);
+          skipCounter++;
+          continue;
+        }
+        Service service = getServiceDefinition(serviceCache.getPath());
+        if (service != null) {
+          Set<Service> services = userServices.get(userName);
+          if (services == null) {
+            services = new HashSet<>();
+            userServices.put(userName, services);
+          }
+          if (!services.add(service)) {
+            int count = ignoredUserServices.containsKey(userName) ?
+                ignoredUserServices.get(userName) : 0;
+            ignoredUserServices.put(userName, count + 1);
+            LOG.warn(
+                "Ignoring service {} for the user {} as it is already present,"
+                    + " filename = {}", service.getName(), userName, filename);
+          }
+          LOG.info("Added service {} for the user {}, filename = {}",
+              service.getName(), userName, filename);
+        }
+      }
+    }
+  }
+
+  private Service getServiceDefinition(Path filePath) {
+    Service service = null;
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Loading service definition from FS: " + filePath);
+      }
+      service = jsonSerDeser.load(fs, filePath);
+    } catch (IOException e) {
+      LOG.info("Error while loading service definition from FS: {}", e);
+    }
+    return service;
+  }
+
+  private RemoteIterator<FileStatus> list(Path path) throws IOException {
+    return new StoppableRemoteIterator(fs.listStatusIterator(path));
+  }
+
+  @VisibleForTesting Map<String, Integer> getIgnoredUserServices() {
+    return ignoredUserServices;
+  }
+
+  private class StoppableRemoteIterator implements RemoteIterator<FileStatus> {
+    private final RemoteIterator<FileStatus> remote;
+
+    StoppableRemoteIterator(RemoteIterator<FileStatus> remote) {
+      this.remote = remote;
+    }
+
+    @Override public boolean hasNext() throws IOException {
+      return !stopExecutors.get() && remote.hasNext();
+    }
+
+    @Override public FileStatus next() throws IOException {
+      return remote.next();
+    }
+  }
+
+  @VisibleForTesting
+  Map<String, Set<Service>> getSyncUserServices() {
+    return syncUserServices;
+  }
+
+  @VisibleForTesting int getSkipCounter() {
+    return skipCounter;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java
new file mode 100644
index 0000000..27632f9
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/client/TestSystemServiceImpl.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Test class for system service manager.
+ */
+public class TestSystemServiceImpl {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSystemServiceImpl.class);
+  private SystemServiceManagerImpl systemService;
+  private Configuration conf;
+  private String resourcePath = "users";
+
+  private String[] users = new String[] {"user1", "user2"};
+  private static Map<String, Set<String>> loadedServices = new HashMap<>();
+  private static Map<String, Set<String>> submittedServices = new HashMap<>();
+
+  @Before
+  public void setup() {
+    File file = new File(
+        getClass().getClassLoader().getResource(resourcePath).getFile());
+    conf = new Configuration();
+    conf.set(YarnServiceConf.YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY,
+        file.getAbsolutePath());
+    systemService = new SystemServiceManagerImpl() {
+      @Override ServiceClient getServiceClient() {
+        return new TestServiceClient();
+      }
+    };
+    systemService.init(conf); // do not call explicit start
+
+    constructUserService(users[0], "example-app1");
+    constructUserService(users[1], "example-app1", "example-app2");
+  }
+
+  @After
+  public void teadDown() {
+    systemService.stop();
+  }
+
+  @Test
+  public void testSystemServiceSubmission() throws Exception {
+    systemService.start();
+
+    /* verify for ignored sevices count */
+    Map<String, Integer> ignoredUserServices =
+        systemService.getIgnoredUserServices();
+    Assert.assertEquals(1, ignoredUserServices.size());
+    Assert.assertTrue("User user1 doesn't exist.",
+        ignoredUserServices.containsKey(users[0]));
+    int count = ignoredUserServices.get(users[0]);
+    Assert.assertEquals(1, count);
+    Assert.assertEquals(1, systemService.getSkipCounter());
+
+    Map<String, Set<Service>> userServices =
+        systemService.getSyncUserServices();
+    Assert.assertEquals(loadedServices.size(), userServices.size());
+    verifyForScannedUserServices(userServices);
+
+    verifyForLaunchedUserServices();
+
+    // 2nd time launch service to handle if service exist scenario
+    systemService.launchUserService(userServices);
+    verifyForLaunchedUserServices();
+  }
+
+  private void verifyForScannedUserServices(
+      Map<String, Set<Service>> userServices) {
+    for (String user : users) {
+      Set<Service> services = userServices.get(user);
+      Set<String> serviceNames = loadedServices.get(user);
+      Assert.assertEquals(serviceNames.size(), services.size());
+      Iterator<Service> iterator = services.iterator();
+      while (iterator.hasNext()) {
+        Service next = iterator.next();
+        Assert.assertTrue(
+            "Service name doesn't exist in expected " + "userService "
+                + serviceNames, serviceNames.contains(next.getName()));
+      }
+    }
+  }
+
+  public void constructUserService(String user, String... serviceNames) {
+    Set<String> service = loadedServices.get(user);
+    if (service == null) {
+      service = new HashSet<>();
+      for (String serviceName : serviceNames) {
+        service.add(serviceName);
+      }
+      loadedServices.put(user, service);
+    }
+  }
+
+  class TestServiceClient extends ServiceClient {
+    @Override
+    protected void serviceStart() throws Exception {
+      // do nothing
+    }
+
+    @Override
+    protected void serviceStop() throws Exception {
+      // do nothing
+    }
+
+    @Override
+    protected void serviceInit(Configuration configuration)
+        throws Exception {
+      // do nothing
+    }
+
+    @Override
+    public ApplicationId actionCreate(Service service)
+        throws YarnException, IOException {
+      String userName =
+          UserGroupInformation.getCurrentUser().getShortUserName();
+      Set<String> services = submittedServices.get(userName);
+      if (services == null) {
+        services = new HashSet<>();
+        submittedServices.put(userName, services);
+      }
+      if (services.contains(service.getName())) {
+        String message = "Failed to create service " + service.getName()
+            + ", because it already exists.";
+        throw new YarnException(message);
+      }
+      services.add(service.getName());
+      return ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    }
+  }
+
+  private void verifyForLaunchedUserServices() {
+    Assert.assertEquals(loadedServices.size(), submittedServices.size());
+    for (Map.Entry<String, Set<String>> entry : submittedServices.entrySet()) {
+      String user = entry.getKey();
+      Set<String> serviceSet = entry.getValue();
+      Assert.assertTrue(loadedServices.containsKey(user));
+      Set<String> services = loadedServices.get(user);
+      Assert.assertEquals(services.size(), serviceSet.size());
+      Assert.assertTrue(services.containsAll(serviceSet));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app1.yarnfile
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app1.yarnfile
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app1.yarnfile
new file mode 100644
index 0000000..823561d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app1.yarnfile
@@ -0,0 +1,16 @@
+{
+  "name": "example-app1",
+  "version": "1.0.0",
+  "components" :
+  [
+    {
+      "name": "simple",
+      "number_of_containers": 1,
+      "launch_command": "sleep 2",
+      "resource": {
+        "cpus": 1,
+        "memory": "128"
+      }
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app2.yarnfile
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app2.yarnfile
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app2.yarnfile
new file mode 100644
index 0000000..823561d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app2.yarnfile
@@ -0,0 +1,16 @@
+{
+  "name": "example-app1",
+  "version": "1.0.0",
+  "components" :
+  [
+    {
+      "name": "simple",
+      "number_of_containers": 1,
+      "launch_command": "sleep 2",
+      "resource": {
+        "cpus": 1,
+        "memory": "128"
+      }
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app3.json
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app3.json
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app3.json
new file mode 100644
index 0000000..8a3a561
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user1/example-app3.json
@@ -0,0 +1,16 @@
+{
+  "name": "example-app3",
+  "version": "1.0.0",
+  "components" :
+  [
+    {
+      "name": "simple",
+      "number_of_containers": 1,
+      "launch_command": "sleep 2",
+      "resource": {
+        "cpus": 1,
+        "memory": "128"
+      }
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app1.yarnfile
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app1.yarnfile
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app1.yarnfile
new file mode 100644
index 0000000..823561d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app1.yarnfile
@@ -0,0 +1,16 @@
+{
+  "name": "example-app1",
+  "version": "1.0.0",
+  "components" :
+  [
+    {
+      "name": "simple",
+      "number_of_containers": 1,
+      "launch_command": "sleep 2",
+      "resource": {
+        "cpus": 1,
+        "memory": "128"
+      }
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app2.yarnfile
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app2.yarnfile
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app2.yarnfile
new file mode 100644
index 0000000..d8fd1d1
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/test/resources/users/sync/user2/example-app2.yarnfile
@@ -0,0 +1,16 @@
+{
+  "name": "example-app2",
+  "version": "1.0.0",
+  "components" :
+  [
+    {
+      "name": "simple",
+      "number_of_containers": 1,
+      "launch_command": "sleep 2",
+      "resource": {
+        "cpus": 1,
+        "memory": "128"
+      }
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
index b9a7594..14c4d15 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
@@ -50,6 +50,8 @@ public class YarnServiceConf {
   public static final String ROLLING_LOG_INCLUSION_PATTERN = 
"yarn.service.rolling-log.include-pattern";
   public static final String ROLLING_LOG_EXCLUSION_PATTERN = 
"yarn.service.rolling-log.exclude-pattern";
 
+  public static final String YARN_SERVICES_SYSTEM_SERVICE_DIRECTORY =
+      YARN_SERVICE_PREFIX + "system-service.dir";
 
   /**
    * The yarn service base path:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestSystemServiceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestSystemServiceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestSystemServiceManager.java
new file mode 100644
index 0000000..dbff02f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestSystemServiceManager.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.service.api.records.Artifact;
+import org.apache.hadoop.yarn.service.api.records.ComponentState;
+import org.apache.hadoop.yarn.service.api.records.Service;
+import org.apache.hadoop.yarn.service.api.records.ServiceState;
+import org.apache.hadoop.yarn.service.exceptions.SliderException;
+import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
+import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link ServiceManager}.
+ */
+public class TestSystemServiceManager {
+
+  @Rule
+  public ServiceTestUtils.ServiceFSWatcher rule =
+      new ServiceTestUtils.ServiceFSWatcher();
+
+  @Test
+  public void testUpgrade() throws IOException, SliderException {
+    ServiceManager serviceManager = createTestServiceManager("testUpgrade");
+    upgrade(serviceManager, "v2", false);
+    Assert.assertEquals("service not upgraded", ServiceState.UPGRADING,
+        serviceManager.getServiceSpec().getState());
+  }
+
+  @Test
+  public void testRestartNothingToUpgrade()
+      throws IOException, SliderException {
+    ServiceManager serviceManager = createTestServiceManager("testRestart");
+    upgrade(serviceManager, "v2", false);
+
+    //make components stable
+    serviceManager.getServiceSpec().getComponents().forEach(comp -> {
+      comp.setState(ComponentState.STABLE);
+    });
+    serviceManager.handle(new ServiceEvent(ServiceEventType.START));
+    Assert.assertEquals("service not re-started", ServiceState.STABLE,
+        serviceManager.getServiceSpec().getState());
+  }
+
+  @Test
+  public void testRestartWithPendingUpgrade()
+      throws IOException, SliderException {
+    ServiceManager serviceManager = createTestServiceManager("testRestart");
+    upgrade(serviceManager, "v2", true);
+    serviceManager.handle(new ServiceEvent(ServiceEventType.START));
+    Assert.assertEquals("service should still be upgrading",
+        ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
+  }
+
+
+  private void upgrade(ServiceManager service, String version,
+      boolean upgradeArtifact)
+      throws IOException, SliderException {
+    Service upgradedDef = ServiceTestUtils.createExampleApplication();
+    upgradedDef.setName(service.getName());
+    upgradedDef.setVersion(version);
+    if (upgradeArtifact) {
+      Artifact upgradedArtifact = createTestArtifact("2");
+      upgradedDef.getComponents().forEach(component -> {
+        component.setArtifact(upgradedArtifact);
+      });
+    }
+    writeUpgradedDef(upgradedDef);
+    ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
+    upgradeEvent.setVersion("v2");
+    service.handle(upgradeEvent);
+  }
+
+  private ServiceManager createTestServiceManager(String name)
+      throws IOException {
+    ServiceContext context = new ServiceContext();
+    context.service = createBaseDef(name);
+    context.fs = rule.getFs();
+
+    context.scheduler = new ServiceScheduler(context) {
+      @Override
+      protected YarnRegistryViewForProviders createYarnRegistryOperations(
+          ServiceContext context, RegistryOperations registryClient) {
+        return mock(YarnRegistryViewForProviders.class);
+      }
+    };
+
+    context.scheduler.init(rule.getConf());
+
+    Map<String, org.apache.hadoop.yarn.service.component.Component>
+        componentState = context.scheduler.getAllComponents();
+    context.service.getComponents().forEach(component -> {
+      componentState.put(component.getName(),
+          new org.apache.hadoop.yarn.service.component.Component(component,
+              1L, context));
+    });
+    return new ServiceManager(context);
+  }
+
+  static Service createBaseDef(String name) {
+    ApplicationId applicationId = ApplicationId.newInstance(
+        System.currentTimeMillis(), 1);
+    Service serviceDef = ServiceTestUtils.createExampleApplication();
+    serviceDef.setId(applicationId.toString());
+    serviceDef.setName(name);
+    serviceDef.setState(ServiceState.STARTED);
+    Artifact artifact = createTestArtifact("1");
+
+    serviceDef.getComponents().forEach(component ->
+        component.setArtifact(artifact));
+    return serviceDef;
+  }
+
+  static Artifact createTestArtifact(String artifactId) {
+    Artifact artifact = new Artifact();
+    artifact.setId(artifactId);
+    artifact.setType(Artifact.TypeEnum.TARBALL);
+    return artifact;
+  }
+
+  private void writeUpgradedDef(Service upgradedDef)
+      throws IOException, SliderException {
+    Path upgradePath = rule.getFs().buildClusterUpgradeDirPath(
+        upgradedDef.getName(), upgradedDef.getVersion());
+    ServiceApiUtil.createDirAndPersistApp(rule.getFs(), upgradePath,
+        upgradedDef);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/SystemServiceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/SystemServiceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/SystemServiceManager.java
new file mode 100644
index 0000000..660f378
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/SystemServiceManager.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.service;
+
+/**
+ * Marker interface for starting services from RM. The implementation should
+ * launch configured services.
+ */
+public interface SystemServiceManager {
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java
new file mode 100644
index 0000000..c448bab
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/service/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.service contains service related
+ * classes.
+ */
+@InterfaceAudience.Private @InterfaceStability.Unstable
+
+package org.apache.hadoop.yarn.server.service;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4e63ccc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 733da5b..f5d84a3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -107,6 +107,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineC
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.server.service.SystemServiceManager;
 import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
@@ -310,7 +311,7 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
     }
 
     rmContext.setYarnConfiguration(conf);
-    
+
     createAndInitActiveServices(false);
 
     webAppAddress = WebAppUtils.getWebAppBindURL(this.conf,
@@ -486,6 +487,27 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
     }
   }
 
+  protected SystemServiceManager createServiceManager() {
+    String schedulerClassName =
+        YarnConfiguration.DEFAULT_YARN_API_SYSTEM_SERVICES_CLASS;
+    LOG.info("Using SystemServiceManager: " + schedulerClassName);
+    try {
+      Class<?> schedulerClazz = Class.forName(schedulerClassName);
+      if (SystemServiceManager.class.isAssignableFrom(schedulerClazz)) {
+        return (SystemServiceManager) ReflectionUtils
+            .newInstance(schedulerClazz, this.conf);
+      } else {
+        throw new YarnRuntimeException(
+            "Class: " + schedulerClassName + " not instance of "
+                + SystemServiceManager.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException(
+          "Could not instantiate SystemServiceManager: " + schedulerClassName,
+          e);
+    }
+  }
+
   protected ApplicationMasterLauncher createAMLauncher() {
     return new ApplicationMasterLauncher(this.rmContext);
   }
@@ -795,6 +817,12 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
 
       new RMNMInfo(rmContext, scheduler);
 
+      if (conf.getBoolean(YarnConfiguration.YARN_API_SERVICES_ENABLE,
+          false)) {
+        SystemServiceManager systemServiceManager = createServiceManager();
+        addIfService(systemServiceManager);
+      }
+
       super.serviceInit(conf);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to