YARN-1151. Ability to configure auxiliary services from HDFS-based JAR files. 
(Xuan Gong via wangda)

Change-Id: Ied37ff11e507fc86847753ba79486652c8fadfe9


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/00ebec89
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/00ebec89
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/00ebec89

Branch: refs/heads/trunk
Commit: 00ebec89f101347a5da44657e388b30c57ed9deb
Parents: d4e63cc
Author: Wangda Tan <wan...@apache.org>
Authored: Fri Apr 6 21:25:57 2018 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Fri Apr 6 21:25:57 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   3 +
 .../containermanager/AuxServices.java           | 160 +++++++++++++++++-
 .../containermanager/ContainerManagerImpl.java  |   3 +-
 .../containermanager/TestAuxServices.java       | 167 +++++++++++++++++--
 4 files changed, 313 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/00ebec89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 7a2a3ce..2590b6f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2106,6 +2106,9 @@ public class YarnConfiguration extends Configuration {
   public static final String NM_AUX_SERVICES_CLASSPATH =
       NM_AUX_SERVICES + ".%s.classpath";
 
+  public static final String NM_AUX_SERVICE_REMOTE_CLASSPATH =
+      NM_AUX_SERVICES + ".%s.remote-classpath";
+
   public static final String NM_AUX_SERVICES_SYSTEM_CLASSES =
       NM_AUX_SERVICES + ".%s.system-classes";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00ebec89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
index 57cca50..c8b7a76 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager;
 
+import java.io.IOException;
+import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
@@ -29,45 +31,70 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.service.ServiceStateChangeListener;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
 import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
 import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
 import org.apache.hadoop.yarn.server.api.AuxiliaryService;
 import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
 import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
+import org.apache.hadoop.yarn.util.FSDownload;
 import com.google.common.base.Preconditions;
 
 public class AuxServices extends AbstractService
     implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {
 
+  public static final String NM_AUX_SERVICE_DIR = "nmAuxService";
+  public static final FsPermission NM_AUX_SERVICE_DIR_PERM =
+      new FsPermission((short) 0700);
+
   static final String STATE_STORE_ROOT_NAME = "nm-aux-services";
 
   private static final Logger LOG =
        LoggerFactory.getLogger(AuxServices.class);
+  private static final String DEL_SUFFIX = "_DEL_";
 
   protected final Map<String,AuxiliaryService> serviceMap;
   protected final Map<String,ByteBuffer> serviceMetaData;
   private final AuxiliaryLocalPathHandler auxiliaryLocalPathHandler;
+  private final LocalDirsHandlerService dirsHandler;
+  private final DeletionService delService;
+  private final UserGroupInformation userUGI;
 
   private final Pattern p = Pattern.compile("^[A-Za-z_]+[A-Za-z0-9_]*$");
 
-  public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler) {
+  public AuxServices(AuxiliaryLocalPathHandler auxiliaryLocalPathHandler,
+      Context nmContext, DeletionService deletionService) {
     super(AuxServices.class.getName());
     serviceMap =
       Collections.synchronizedMap(new HashMap<String,AuxiliaryService>());
     serviceMetaData =
       Collections.synchronizedMap(new HashMap<String,ByteBuffer>());
     this.auxiliaryLocalPathHandler = auxiliaryLocalPathHandler;
+    this.dirsHandler = nmContext.getLocalDirsHandler();
+    this.delService = deletionService;
+    this.userUGI = getRemoteUgi();
     // Obtain services from configuration in init()
   }
 
@@ -125,15 +152,103 @@ public class AuxServices extends AbstractService
         String classKey = String.format(
             YarnConfiguration.NM_AUX_SERVICE_FMT, sName);
         String className = conf.get(classKey);
-        final String appClassPath = conf.get(String.format(
+        final String appLocalClassPath = conf.get(String.format(
             YarnConfiguration.NM_AUX_SERVICES_CLASSPATH, sName));
+        final String appRemoteClassPath = conf.get(String.format(
+            YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, sName));
         AuxiliaryService s = null;
-        boolean useCustomerClassLoader = appClassPath != null
-            && !appClassPath.isEmpty() && className != null
-            && !className.isEmpty();
+        boolean useCustomerClassLoader = ((appLocalClassPath != null
+            && !appLocalClassPath.isEmpty()) ||
+            (appRemoteClassPath != null && !appRemoteClassPath.isEmpty()))
+            && className != null && !className.isEmpty();
         if (useCustomerClassLoader) {
-          s = AuxiliaryServiceWithCustomClassLoader.getInstance(
-              conf, className, appClassPath);
+          // load AuxiliaryService from local class path
+          if (appRemoteClassPath == null || appRemoteClassPath.isEmpty()) {
+            s = AuxiliaryServiceWithCustomClassLoader.getInstance(
+                conf, className, appLocalClassPath);
+          } else {
+            // load AuxiliaryService from remote class path
+            if (appLocalClassPath != null && !appLocalClassPath.isEmpty()) {
+              throw new YarnRuntimeException("The aux serivce:" + sName
+                  + " has configured local classpath:" + appLocalClassPath
+                  + " and remote classpath:" + appRemoteClassPath
+                  + ". Only one of them should be configured.");
+            }
+            FileContext localLFS = getLocalFileContext(conf);
+            // create NM aux-service dir in NM localdir if it does not exist.
+            Path nmAuxDir = dirsHandler.getLocalPathForWrite("."
+                + Path.SEPARATOR + NM_AUX_SERVICE_DIR);
+            if (!localLFS.util().exists(nmAuxDir)) {
+              try {
+                localLFS.mkdir(nmAuxDir, NM_AUX_SERVICE_DIR_PERM, true);
+              } catch (IOException ex) {
+                throw new YarnRuntimeException("Fail to create dir:"
+                    + nmAuxDir.toString(), ex);
+              }
+            }
+            Path src = new Path(appRemoteClassPath);
+            FileContext remoteLFS = getRemoteFileContext(src.toUri(), conf);
+            FileStatus scFileStatus = remoteLFS.getFileStatus(src);
+            if (!scFileStatus.getOwner().equals(
+                this.userUGI.getShortUserName())) {
+              throw new YarnRuntimeException("The remote jarfile owner:"
+                  + scFileStatus.getOwner() + " is not the same as the NM 
user:"
+                  + this.userUGI.getShortUserName() + ".");
+            }
+            if ((scFileStatus.getPermission().toShort() & 0022) != 0) {
+              throw new YarnRuntimeException("The remote jarfile should not "
+                  + "be writable by group or others. "
+                  + "The current Permission is "
+                  + scFileStatus.getPermission().toShort());
+            }
+            Path dest = null;
+            Path downloadDest = new Path(nmAuxDir,
+                className + "_" + scFileStatus.getModificationTime());
+            // check whether we need to re-download the jar
+            // from remote directory
+            Path targetDirPath = new Path(downloadDest,
+                scFileStatus.getPath().getName());
+            FileStatus[] allSubDirs = localLFS.util().listStatus(nmAuxDir);
+            boolean reDownload = true;
+            for (FileStatus sub : allSubDirs) {
+              if (sub.getPath().getName().equals(downloadDest.getName())) {
+                reDownload = false;
+                dest = new Path(targetDirPath + Path.SEPARATOR + "*");
+                break;
+              } else {
+                if (sub.getPath().getName().contains(className) &&
+                    !sub.getPath().getName().endsWith(DEL_SUFFIX)) {
+                  Path delPath = new Path(sub.getPath().getParent(),
+                      sub.getPath().getName() + DEL_SUFFIX);
+                  localLFS.rename(sub.getPath(), delPath);
+                  LOG.info("delete old aux service jar dir:"
+                      + delPath.toString());
+                  FileDeletionTask deletionTask = new FileDeletionTask(
+                      this.delService, null, delPath, null);
+                  this.delService.delete(deletionTask);
+                }
+              }
+            }
+            if (reDownload) {
+              LocalResource scRsrc = LocalResource.newInstance(
+                  URL.fromURI(src.toUri()),
+                  LocalResourceType.ARCHIVE, LocalResourceVisibility.PRIVATE,
+                  scFileStatus.getLen(), scFileStatus.getModificationTime());
+              FSDownload download = new FSDownload(localLFS, null, conf,
+                  downloadDest, scRsrc, null);
+              try {
+                Path downloaded = download.call();
+                dest = new Path(downloaded + Path.SEPARATOR + "*");
+              } catch (Exception ex) {
+                throw new YarnRuntimeException(
+                    "Exception happend while downloading files "
+                    + "for aux-service:" + sName + " and remote-file-path:"
+                    + src + ".\n" + ex.getMessage());
+              }
+            }
+            s = AuxiliaryServiceWithCustomClassLoader.getInstance(
+                conf, className, dest.toString());
+          }
           LOG.info("The aux service:" + sName
               + " are using the custom classloader");
         } else {
@@ -289,4 +404,33 @@ public class AuxServices extends AbstractService
         : "The auxService name is " + service.getName())
         + " and it got an error at event: " + eventType, th);
   }
+
+  FileContext getLocalFileContext(Configuration conf) {
+    try {
+      return FileContext.getLocalFSFileContext(conf);
+    } catch (IOException e) {
+      throw new YarnRuntimeException("Failed to access local fs");
+    }
+  }
+
+  FileContext getRemoteFileContext(final URI path, Configuration conf) {
+    try {
+      return FileContext.getFileContext(path, conf);
+    } catch (IOException e) {
+      throw new YarnRuntimeException("Failed to access remote fs");
+    }
+  }
+
+  private UserGroupInformation getRemoteUgi() {
+    UserGroupInformation remoteUgi;
+    try {
+      remoteUgi = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      String msg = "Cannot obtain the user-name. Got exception: "
+          + StringUtils.stringifyException(e);
+      LOG.warn(msg);
+      throw new YarnRuntimeException(msg);
+    }
+    return remoteUgi;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00ebec89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 6b4d517..3470910 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -253,7 +253,8 @@ public class ContainerManagerImpl extends CompositeService 
implements
     AuxiliaryLocalPathHandler auxiliaryLocalPathHandler =
         new AuxiliaryLocalPathHandlerImpl(dirsHandler);
     // Start configurable services
-    auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler);
+    auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler,
+        this.context, this.deletionService);
     auxiliaryServices.registerServiceListener(this);
     addService(auxiliaryServices);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/00ebec89/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
index 6d12f8c..fcf92b5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
@@ -25,8 +25,12 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import org.mockito.Mockito;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +41,10 @@ import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.attribute.FileTime;
+import java.nio.file.attribute.PosixFilePermission;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -46,6 +54,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -61,6 +70,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
 import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
@@ -69,8 +79,11 @@ import org.apache.hadoop.yarn.server.api.AuxiliaryService;
 import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
 import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -82,7 +95,10 @@ public class TestAuxServices {
           System.getProperty("java.io.tmpdir")),
       TestAuxServices.class.getName());
   private final static AuxiliaryLocalPathHandler MOCK_AUX_PATH_HANDLER =
-      Mockito.mock(AuxiliaryLocalPathHandler.class);
+      mock(AuxiliaryLocalPathHandler.class);
+  private final static Context MOCK_CONTEXT = mock(Context.class);
+  private final static DeletionService MOCK_DEL_SERVICE = mock(
+      DeletionService.class);
 
   static class LightService extends AuxiliaryService implements Service
        {
@@ -188,6 +204,126 @@ public class TestAuxServices {
     }
   }
 
+  @SuppressWarnings("resource")
+  @Test
+  public void testRemoteAuxServiceClassPath() throws Exception {
+    Configuration conf = new YarnConfiguration();
+    FileSystem fs = FileSystem.get(conf);
+    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+        new String[] {"ServiceC"});
+    conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+        "ServiceC"), ServiceC.class, Service.class);
+
+    Context mockContext2 = mock(Context.class);
+    LocalDirsHandlerService mockDirsHandler = mock(
+        LocalDirsHandlerService.class);
+    String root = "target/LocalDir";
+    Path rootAuxServiceDirPath = new Path(root, "nmAuxService");
+    when(mockDirsHandler.getLocalPathForWrite(anyString())).thenReturn(
+        rootAuxServiceDirPath);
+    when(mockContext2.getLocalDirsHandler()).thenReturn(mockDirsHandler);
+
+    File rootDir = GenericTestUtils.getTestDir(getClass()
+        .getSimpleName());
+    if (!rootDir.exists()) {
+      rootDir.mkdirs();
+    }
+    AuxServices aux = null;
+    File testJar = null;
+    try {
+      // the remote jar file should not be be writable by group or others.
+      try {
+        testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
+            "test-runjar.jar", 2048, ServiceC.class.getName());
+        // Give group a write permission.
+        // We should not load the auxservice from remote jar file.
+        Set<PosixFilePermission> perms = new HashSet<PosixFilePermission>();
+        perms.add(PosixFilePermission.OWNER_READ);
+        perms.add(PosixFilePermission.OWNER_WRITE);
+        perms.add(PosixFilePermission.GROUP_WRITE);
+        Files.setPosixFilePermissions(Paths.get(testJar.getAbsolutePath()),
+            perms);
+        conf.set(String.format(
+            YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"),
+            testJar.getAbsolutePath());
+        aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
+            mockContext2, MOCK_DEL_SERVICE);
+        aux.init(conf);
+        Assert.fail("The permission of the jar is wrong."
+            + "Should throw out exception.");
+      } catch (YarnRuntimeException ex) {
+        Assert.assertTrue(ex.getMessage(), ex.getMessage().contains(
+            "The remote jarfile should not be writable by group or others"));
+      }
+
+      Files.delete(Paths.get(testJar.getAbsolutePath()));
+
+      testJar = JarFinder.makeClassLoaderTestJar(this.getClass(), rootDir,
+          "test-runjar.jar", 2048, ServiceC.class.getName());
+      conf.set(String.format(
+          YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH, "ServiceC"),
+          testJar.getAbsolutePath());
+      aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
+          mockContext2, MOCK_DEL_SERVICE);
+      aux.init(conf);
+      aux.start();
+      Map<String, ByteBuffer> meta = aux.getMetaData();
+      String auxName = "";
+      Assert.assertTrue(meta.size() == 1);
+      for(Entry<String, ByteBuffer> i : meta.entrySet()) {
+        auxName = i.getKey();
+      }
+      Assert.assertEquals("ServiceC", auxName);
+      aux.serviceStop();
+      FileStatus[] status = fs.listStatus(rootAuxServiceDirPath);
+      Assert.assertTrue(status.length == 1);
+
+      // initialize the same auxservice again, and make sure that we did not
+      // re-download the jar from remote directory.
+      aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
+          mockContext2, MOCK_DEL_SERVICE);
+      aux.init(conf);
+      aux.start();
+      meta = aux.getMetaData();
+      Assert.assertTrue(meta.size() == 1);
+      for(Entry<String, ByteBuffer> i : meta.entrySet()) {
+        auxName = i.getKey();
+      }
+      Assert.assertEquals("ServiceC", auxName);
+      verify(MOCK_DEL_SERVICE, times(0)).delete(any(FileDeletionTask.class));
+      status = fs.listStatus(rootAuxServiceDirPath);
+      Assert.assertTrue(status.length == 1);
+      aux.serviceStop();
+
+      // change the last modification time for remote jar,
+      // we will re-download the jar and clean the old jar
+      long time = System.currentTimeMillis() + 3600*1000;
+      FileTime fileTime = FileTime.fromMillis(time);
+      Files.setLastModifiedTime(Paths.get(testJar.getAbsolutePath()),
+          fileTime);
+      conf.set(
+          String.format(YarnConfiguration.NM_AUX_SERVICE_REMOTE_CLASSPATH,
+              "ServiceC"),
+          testJar.getAbsolutePath());
+      aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
+          mockContext2, MOCK_DEL_SERVICE);
+      aux.init(conf);
+      aux.start();
+      verify(MOCK_DEL_SERVICE, times(1)).delete(any(FileDeletionTask.class));
+      status = fs.listStatus(rootAuxServiceDirPath);
+      Assert.assertTrue(status.length == 2);
+      aux.serviceStop();
+    } finally {
+      if (testJar != null) {
+        testJar.delete();
+        rootDir.delete();
+      }
+      if (fs.exists(new Path(root))) {
+        fs.delete(new Path(root), true);
+      }
+    }
+  }
+
   // To verify whether we could load class from customized class path.
   // We would use ServiceC in this test. Also create a separate jar file
   // including ServiceC class, and add this jar to customized directory.
@@ -202,7 +338,8 @@ public class TestAuxServices {
     conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
         "ServiceC"), ServiceC.class, Service.class);
     @SuppressWarnings("resource")
-    AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
+    AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
+        MOCK_CONTEXT, MOCK_DEL_SERVICE);
     aux.init(conf);
     aux.start();
     Map<String, ByteBuffer> meta = aux.getMetaData();
@@ -244,7 +381,8 @@ public class TestAuxServices {
       conf.set(String.format(
           YarnConfiguration.NM_AUX_SERVICES_SYSTEM_CLASSES,
           "ServiceC"), systemClasses);
-      aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
+      aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
+          MOCK_CONTEXT, MOCK_DEL_SERVICE);
       aux.init(conf);
       aux.start();
       meta = aux.getMetaData();
@@ -282,7 +420,8 @@ public class TestAuxServices {
         ServiceB.class, Service.class);
     conf.setInt("A.expected.init", 1);
     conf.setInt("B.expected.stop", 1);
-    final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
+    final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
+        MOCK_CONTEXT, MOCK_DEL_SERVICE);
     aux.init(conf);
     aux.start();
 
@@ -346,7 +485,8 @@ public class TestAuxServices {
         ServiceA.class, Service.class);
     conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
         ServiceB.class, Service.class);
-    final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
+    final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
+        MOCK_CONTEXT, MOCK_DEL_SERVICE);
     aux.init(conf);
 
     int latch = 1;
@@ -379,7 +519,8 @@ public class TestAuxServices {
         ServiceA.class, Service.class);
     conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
         ServiceB.class, Service.class);
-    final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
+    final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
+        MOCK_CONTEXT, MOCK_DEL_SERVICE);
     aux.init(conf);
 
     int latch = 1;
@@ -416,7 +557,8 @@ public class TestAuxServices {
         ServiceA.class, Service.class);
     conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
         ServiceB.class, Service.class);
-    final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
+    final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
+        MOCK_CONTEXT, MOCK_DEL_SERVICE);
     aux.init(conf);
     aux.start();
 
@@ -429,7 +571,8 @@ public class TestAuxServices {
 
   @Test
   public void testValidAuxServiceName() {
-    final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
+    final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
+        MOCK_CONTEXT, MOCK_DEL_SERVICE);
     Configuration conf = new Configuration();
     conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] {"Asrv1", 
"Bsrv_2"});
     conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Asrv1"),
@@ -443,7 +586,8 @@ public class TestAuxServices {
     }
 
     //Test bad auxService Name
-    final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER);
+    final AuxServices aux1 = new AuxServices(MOCK_AUX_PATH_HANDLER,
+        MOCK_CONTEXT, MOCK_DEL_SERVICE);
     conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, new String[] 
{"1Asrv1"});
     conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, 
"1Asrv1"),
         ServiceA.class, Service.class);
@@ -469,7 +613,8 @@ public class TestAuxServices {
     conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, "Bsrv"),
         RecoverableServiceB.class, Service.class);
     try {
-      final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER);
+      final AuxServices aux = new AuxServices(MOCK_AUX_PATH_HANDLER,
+          MOCK_CONTEXT, MOCK_DEL_SERVICE);
       aux.init(conf);
       Assert.assertEquals(2, aux.getServices().size());
       File auxStorageDir = new File(TEST_DIR,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to