Repository: oozie
Updated Branches:
  refs/heads/master 340d55977 -> a5779d75c


OOZIE-2402 oozie-setup.sh sharelib create takes a long time on large clusters 
(yalovyyi via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/a5779d75
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/a5779d75
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/a5779d75

Branch: refs/heads/master
Commit: a5779d75c076f43dcd9cf8d284c0f82d96a88c9d
Parents: 340d559
Author: Robert Kanter <[email protected]>
Authored: Tue Dec 15 14:24:33 2015 -0800
Committer: Robert Kanter <[email protected]>
Committed: Tue Dec 15 14:24:33 2015 -0800

----------------------------------------------------------------------
 distro/src/main/bin/oozie-setup.ps1             |  8 +-
 distro/src/main/bin/oozie-setup.sh              |  8 +-
 docs/src/site/twiki/AG_Install.twiki            |  8 +-
 release-log.txt                                 |  1 +
 .../apache/oozie/tools/OozieSharelibCLI.java    | 95 ++++++++++++++++++--
 .../oozie/tools/TestOozieSharelibCLI.java       | 90 +++++++++++++------
 6 files changed, 169 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/a5779d75/distro/src/main/bin/oozie-setup.ps1
----------------------------------------------------------------------
diff --git a/distro/src/main/bin/oozie-setup.ps1 
b/distro/src/main/bin/oozie-setup.ps1
index 18ab973..ca89b88 100644
--- a/distro/src/main/bin/oozie-setup.ps1
+++ b/distro/src/main/bin/oozie-setup.ps1
@@ -25,13 +25,17 @@ Function PrintUsage { param ()
     Write-Output  "Usage  : oozie-setup.ps1 COMMAND [OPTIONS]"
     Write-Output  "          prepare-war [-d directory] [-secure] (-d 
identifies an alternative directory for processing jars"
     Write-Output  "                                                -secure 
will configure the war file to use HTTPS (SSL))"
-    Write-Output  "          sharelib create -fs FS_URI [-locallib 
SHARED_LIBRARY] (create sharelib for oozie,"
+    Write-Output  "          sharelib create -fs FS_URI [-locallib 
SHARED_LIBRARY] [-concurrency CONCURRENCY]"
+    Write-Output  "                                                            
    (create sharelib for oozie,"
     Write-Output  "                                                            
    FS_URI is the fs.default.name"
     Write-Output  "                                                            
    for hdfs uri; SHARED_LIBRARY, path to the"
     Write-Output  "                                                            
    Oozie sharelib to install, it can be a tarball"
     Write-Output  "                                                            
    or an expanded version of it. If ommited,"
     Write-Output  "                                                            
    the Oozie sharelib tarball from the Oozie"
-    Write-Output  "                                                            
    installation directory will be used)"
+    Write-Output  "                                                            
    installation directory will be used."
+    Write-Output  "                                                            
    CONCURRENCY is a number of threads to be used"
+    Write-Output  "                                                            
    for copy operations."
+    Write-Output  "                                                            
    By default 1 thread will be used)"
     Write-Output  "                                                            
    (action failes if sharelib is already installed"
     Write-Output  "                                                            
    in HDFS)"
     Write-Output  "          sharelib upgrade -fs FS_URI [-locallib 
SHARED_LIBRARY] ([deprecated]"

http://git-wip-us.apache.org/repos/asf/oozie/blob/a5779d75/distro/src/main/bin/oozie-setup.sh
----------------------------------------------------------------------
diff --git a/distro/src/main/bin/oozie-setup.sh 
b/distro/src/main/bin/oozie-setup.sh
index 663d2c9..a41e42d 100644
--- a/distro/src/main/bin/oozie-setup.sh
+++ b/distro/src/main/bin/oozie-setup.sh
@@ -22,13 +22,17 @@ function printUsage() {
   echo " Usage  : oozie-setup.sh <Command and OPTIONS>"
   echo "          prepare-war [-d directory] [-secure] (-d identifies an 
alternative directory for processing jars"
   echo "                                                -secure will configure 
the war file to use HTTPS (SSL))"
-  echo "          sharelib create -fs FS_URI [-locallib SHARED_LIBRARY] 
(create sharelib for oozie,"
+  echo "          sharelib create -fs FS_URI [-locallib SHARED_LIBRARY] 
[-concurrency CONCURRENCY]"
+  echo "                                                                
(create sharelib for oozie,"
   echo "                                                                FS_URI 
is the fs.default.name"
   echo "                                                                for 
hdfs uri; SHARED_LIBRARY, path to the"
   echo "                                                                Oozie 
sharelib to install, it can be a tarball"
   echo "                                                                or an 
expanded version of it. If ommited,"
   echo "                                                                the 
Oozie sharelib tarball from the Oozie"
-  echo "                                                                
installation directory will be used)"
+  echo "                                                                
installation directory will be used."
+  echo "                                                                
CONCURRENCY is a number of threads to be used"
+  echo "                                                                for 
copy operations."
+  echo "                                                                By 
default 1 thread will be used)"
   echo "                                                                
(action failes if sharelib is already installed"
   echo "                                                                in 
HDFS)"
   echo "          sharelib upgrade -fs FS_URI [-locallib SHARED_LIBRARY] 
(upgrade existing sharelib, fails if there"

http://git-wip-us.apache.org/repos/asf/oozie/blob/a5779d75/docs/src/site/twiki/AG_Install.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/AG_Install.twiki 
b/docs/src/site/twiki/AG_Install.twiki
index cd9f610..66c0019 100644
--- a/docs/src/site/twiki/AG_Install.twiki
+++ b/docs/src/site/twiki/AG_Install.twiki
@@ -68,13 +68,17 @@ The =oozie-setup.sh= script options are:
 Usage  : oozie-setup.sh <OPTIONS>"
          prepare-war [-d directory] [-secure] (-d identifies an alternative 
directory for processing jars"
                                               -secure will configure the war 
file to use HTTPS (SSL))"
-         sharelib create -fs FS_URI [-locallib SHARED_LIBRARY] (create 
sharelib for oozie,"
+         sharelib create -fs FS_URI [-locallib SHARED_LIBRARY] [-concurrency 
CONCURRENCY]"
+                                                               (create 
sharelib for oozie,"
                                                                FS_URI is the 
fs.default.name"
                                                                for hdfs uri; 
SHARED_LIBRARY, path to the"
                                                                Oozie sharelib 
to install, it can be a tarball"
                                                                or an expanded 
version of it. If omitted,"
                                                                the Oozie 
sharelib tarball from the Oozie"
-                                                               installation 
directory will be used)"
+                                                               installation 
directory will be used."
+                                                               CONCURRENCY is 
a number of threads to be used"
+                                                               for copy 
operations."
+                                                               By default 1 
thread will be used)"
                                                                (action fails 
if sharelib is already installed"
                                                                in HDFS)"
          sharelib upgrade -fs FS_URI [-locallib SHARED_LIBRARY] 
([deprecated][use create command to create new version]

http://git-wip-us.apache.org/repos/asf/oozie/blob/a5779d75/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index b06735c..01326c8 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2402 oozie-setup.sh sharelib create takes a long time on large clusters 
(yalovyyi via rkanter)
 OOZIE-2185 Make oozie cli source conf/oozie-client-env.sh (grimesmi via 
rkanter)
 OOZIE-2413 Kerberos credentials can expire if the KDC is slow to respond 
(rkanter)
 OOZIE-2411 Add BCC to oozie email action (fdenes via rkanter)

http://git-wip-us.apache.org/repos/asf/oozie/blob/a5779d75/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java 
b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
index 2d9cf3a..15d0a8b 100644
--- a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
+++ b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java
@@ -21,8 +21,17 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -48,6 +57,7 @@ public class OozieSharelibCLI {
     public static final String UPGRADE_CMD = "upgrade";
     public static final String LIB_OPT = "locallib";
     public static final String FS_OPT = "fs";
+    public static final String CONCURRENCY_OPT = "concurrency";
     public static final String OOZIE_HOME = "oozie.home.dir";
     public static final String SHARE_LIB_PREFIX = "lib_";
 
@@ -64,9 +74,11 @@ public class OozieSharelibCLI {
     protected Options createUpgradeOptions(String subCommand){
         Option sharelib = new Option(LIB_OPT, true, "Local share library 
directory");
         Option uri = new Option(FS_OPT, true, "URI of the fileSystem to " + 
subCommand + " oozie share library");
+        Option concurrency = new Option(CONCURRENCY_OPT, true, "Number of 
threads to be used for copy operations. (default=1)");
         Options options = new Options();
         options.addOption(sharelib);
         options.addOption(uri);
+        options.addOption(concurrency);
         return options;
     }
 
@@ -99,6 +111,7 @@ public class OozieSharelibCLI {
                 throw new Exception("-fs option must be specified");
             }
 
+            int threadPoolSize = 
Integer.valueOf(command.getCommandLine().getOptionValue(CONCURRENCY_OPT, "1"));
             File srcFile = null;
 
             //Check whether user provided locallib
@@ -163,7 +176,12 @@ public class OozieSharelibCLI {
                 throw new IOException(srcPath + " cannot be found");
             }
 
-            fs.copyFromLocalFile(false, srcPath, dstPath);
+            if (threadPoolSize > 1) {
+                concurrentCopyFromLocal(fs, threadPoolSize, srcFile, dstPath);
+            } else {
+                fs.copyFromLocalFile(false, srcPath, dstPath);
+            }
+
             services.destroy();
             FileUtils.deleteDirectory(temp);
 
@@ -176,22 +194,81 @@ public class OozieSharelibCLI {
             return 1;
         }
         catch (Exception ex) {
-            System.err.println();
-            System.err.println("Error: " + ex.getMessage());
-            System.err.println();
-            System.err.println("Stack trace for the error was (for debug 
purposes):");
-            System.err.println("--------------------------------------");
-            ex.printStackTrace(System.err);
-            System.err.println("--------------------------------------");
-            System.err.println();
+            logError(ex.getMessage(), ex);
             return 1;
         }
     }
 
+    private void logError(String errorMessage, Throwable ex) {
+        System.err.println();
+        System.err.println("Error: " + errorMessage);
+        System.err.println();
+        System.err.println("Stack trace for the error was (for debug 
purposes):");
+        System.err.println("--------------------------------------");
+        ex.printStackTrace(System.err);
+        System.err.println("--------------------------------------");
+        System.err.println();
+    }
+
     public String getTimestampDirectory() {
         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
         Date date = new Date();
         return dateFormat.format(date).toString();
     }
 
+    private void concurrentCopyFromLocal(final FileSystem fs, int 
threadPoolSize,
+            File srcFile, final Path dstPath) throws IOException {
+        List<Future<Void>> futures = Collections.emptyList();
+        ExecutorService threadPool = 
Executors.newFixedThreadPool(threadPoolSize);
+        try {
+            futures = copyFolderRecursively(fs, threadPool, srcFile, dstPath);
+            System.out.println("Running " + futures.size() + " copy tasks on " 
+ threadPoolSize + " threads");
+        } finally {
+            try {
+                threadPool.shutdown();
+            } finally {
+                checkCopyResults(futures);
+            }
+        }
+    }
+
+    private void checkCopyResults(List<Future<Void>> futures) throws 
IOException {
+        Throwable t = null;
+        for (Future<Void> future : futures) {
+            try {
+                future.get();
+            } catch (CancellationException ce) {
+                t = ce;
+                logError("Copy task was cancelled", ce);
+            } catch (ExecutionException ee) {
+                t = ee.getCause();
+                logError("Copy task failed with exception", t);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
+        if (t != null) {
+            throw new IOException ("At least one copy task failed with 
exception", t);
+        }
+    }
+
+    private List<Future<Void>> copyFolderRecursively(final FileSystem fs, 
final ExecutorService threadPool,
+            File srcFile, final Path dstPath) throws IOException {
+        List<Future<Void>> taskList = new ArrayList<Future<Void>>();
+        for (final File file : srcFile.listFiles()) {
+            final Path trgName = new Path(dstPath, file.getName());
+            if (file.isDirectory()) {
+                taskList.addAll(copyFolderRecursively(fs, threadPool, file, 
trgName));
+            } else {
+                taskList.add(threadPool.submit(new Callable<Void>() {
+                    @Override
+                    public Void call() throws Exception {
+                        fs.copyFromLocalFile(new Path(file.toURI()), trgName);
+                        return null;
+                    }
+                }));
+            }
+        }
+        return taskList;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/a5779d75/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java
----------------------------------------------------------------------
diff --git 
a/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java 
b/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java
index 829d979..7fff802 100644
--- a/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java
+++ b/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java
@@ -22,55 +22,51 @@ package org.apache.oozie.tools;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileWriter;
-import java.io.IOException;
+import java.io.InputStream;
 import java.io.PrintStream;
 import java.io.Writer;
 import java.net.URI;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import org.apache.commons.io.IOUtils;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.ShareLibService;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.test.XTestCase;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
 
 /**
  * Test OozieSharelibCLI
  */
 public class TestOozieSharelibCLI extends XTestCase {
     private SecurityManager SECURITY_MANAGER;
-    private String outPath = "outFolder";
+    private final String outPath = "outFolder";
     private Services services = null;
     private Path dstPath = null;
     private FileSystem fs;
+    private final TemporaryFolder tmpFolder = new TemporaryFolder();
 
-    @BeforeClass
+    @Override
     protected void setUp() throws Exception {
         SECURITY_MANAGER = System.getSecurityManager();
         new LauncherSecurityManager();
+        tmpFolder.create();
         super.setUp(false);
 
     }
 
-    @AfterClass
+    @Override
     protected void tearDown() throws Exception {
         System.setSecurityManager(SECURITY_MANAGER);
         if (services != null) {
             services.destroy();
         }
+        tmpFolder.delete();
         super.tearDown();
-
     }
 
     /**
@@ -83,12 +79,15 @@ public class TestOozieSharelibCLI extends XTestCase {
         try {
             String[] argsHelp = { "help" };
             assertEquals(0, execOozieSharelibCLICommands(argsHelp));
-            assertTrue(data.toString().contains(
+            String helpMessage = data.toString();
+            assertTrue(helpMessage.contains(
                     "oozie-setup.sh create <OPTIONS> : create a new 
timestamped version of oozie sharelib"));
-            assertTrue(data.toString().contains(
+            assertTrue(helpMessage.contains(
                     "oozie-setup.sh upgrade <OPTIONS> : [deprecated][use 
command \"create\" "
                             + "to create new version]   upgrade oozie sharelib 
"));
-            assertTrue(data.toString().contains(" oozie-setup.sh help"));
+            assertTrue(helpMessage.contains(" oozie-setup.sh help"));
+            assertTrue(helpMessage.contains(
+                    "-concurrency <arg>   Number of threads to be used for 
copy operations."));
         }
         finally {
             System.setOut(oldPrintStream);
@@ -99,17 +98,9 @@ public class TestOozieSharelibCLI extends XTestCase {
     /**
      * test copy libraries
      */
-    public void testOozieSharelibCLI() throws Exception {
-
-        File libDirectory = new File(getTestCaseConfDir() + File.separator + 
"lib");
+    public void testOozieSharelibCLICreate() throws Exception {
 
-        if (!libDirectory.exists()) {
-            libDirectory.mkdirs();
-        }
-        else {
-            FileUtil.fullyDelete(libDirectory);
-            libDirectory.mkdirs();
-        }
+        File libDirectory = tmpFolder.newFolder("lib");
 
         writeFile(libDirectory, "file1", "test File");
         writeFile(libDirectory, "file2", "test File2");
@@ -129,6 +120,46 @@ public class TestOozieSharelibCLI extends XTestCase {
     }
 
     /**
+     * test parallel copy libraries
+     */
+    public void testOozieSharelibCLICreateConcurrent() throws Exception {
+
+        final int testFiles = 7;
+        final int concurrency = 5;
+
+        File libDirectory = tmpFolder.newFolder("lib");
+
+        for (int i = 0; i < testFiles; i++) {
+            writeFile(libDirectory, generateFileName(i), 
generateFileContent(i));
+        }
+
+        String[] argsCreate = {"create", "-fs", outPath, "-locallib", 
libDirectory.getParentFile().getAbsolutePath(),
+            "-concurrency", String.valueOf(concurrency)};
+        assertEquals(0, execOozieSharelibCLICommands(argsCreate));
+
+        getTargetFileSysyem();
+        ShareLibService sharelibService = 
getServices().get(ShareLibService.class);
+        Path latestLibPath = sharelibService.getLatestLibPath(getDistPath(),
+                ShareLibService.SHARE_LIB_PREFIX);
+
+        // test files in new folder
+
+        for (int i = 0; i < testFiles; i++) {
+            String fileName = generateFileName(i);
+            String expectedFileContent = generateFileContent(i);
+            InputStream in = null;
+            try {
+                in = fs.open(new Path(latestLibPath, fileName));
+                String actualFileContent = IOUtils.toString(in);
+                assertEquals(fileName, expectedFileContent, actualFileContent);
+            } finally {
+                IOUtils.closeQuietly(in);
+            }
+        }
+
+    }
+
+    /**
      * test fake command
      */
     public void testFakeCommand() throws Exception {
@@ -206,4 +237,11 @@ public class TestOozieSharelibCLI extends XTestCase {
         return 1;
     }
 
+    private static String generateFileName(int i) {
+        return "file_" + i;
+    }
+
+    private static String generateFileContent(int i) {
+        return "test File " + i;
+    }
 }

Reply via email to