Repository: oozie Updated Branches: refs/heads/master 340d55977 -> a5779d75c
OOZIE-2402 oozie-setup.sh sharelib create takes a long time on large clusters (yalovyyi via rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/a5779d75 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/a5779d75 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/a5779d75 Branch: refs/heads/master Commit: a5779d75c076f43dcd9cf8d284c0f82d96a88c9d Parents: 340d559 Author: Robert Kanter <[email protected]> Authored: Tue Dec 15 14:24:33 2015 -0800 Committer: Robert Kanter <[email protected]> Committed: Tue Dec 15 14:24:33 2015 -0800 ---------------------------------------------------------------------- distro/src/main/bin/oozie-setup.ps1 | 8 +- distro/src/main/bin/oozie-setup.sh | 8 +- docs/src/site/twiki/AG_Install.twiki | 8 +- release-log.txt | 1 + .../apache/oozie/tools/OozieSharelibCLI.java | 95 ++++++++++++++++++-- .../oozie/tools/TestOozieSharelibCLI.java | 90 +++++++++++++------ 6 files changed, 169 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/a5779d75/distro/src/main/bin/oozie-setup.ps1 ---------------------------------------------------------------------- diff --git a/distro/src/main/bin/oozie-setup.ps1 b/distro/src/main/bin/oozie-setup.ps1 index 18ab973..ca89b88 100644 --- a/distro/src/main/bin/oozie-setup.ps1 +++ b/distro/src/main/bin/oozie-setup.ps1 @@ -25,13 +25,17 @@ Function PrintUsage { param () Write-Output "Usage : oozie-setup.ps1 COMMAND [OPTIONS]" Write-Output " prepare-war [-d directory] [-secure] (-d identifies an alternative directory for processing jars" Write-Output " -secure will configure the war file to use HTTPS (SSL))" - Write-Output " sharelib create -fs FS_URI [-locallib SHARED_LIBRARY] (create sharelib for oozie," + Write-Output " sharelib create -fs FS_URI [-locallib SHARED_LIBRARY] [-concurrency CONCURRENCY]" + Write-Output " (create sharelib for oozie," Write-Output " FS_URI is the fs.default.name" Write-Output " for hdfs uri; SHARED_LIBRARY, path to the" Write-Output " Oozie sharelib to install, it can be a tarball" Write-Output " or an expanded version of it. If ommited," Write-Output " the Oozie sharelib tarball from the Oozie" - Write-Output " installation directory will be used)" + Write-Output " installation directory will be used." + Write-Output " CONCURRENCY is a number of threads to be used" + Write-Output " for copy operations." + Write-Output " By default 1 thread will be used)" Write-Output " (action failes if sharelib is already installed" Write-Output " in HDFS)" Write-Output " sharelib upgrade -fs FS_URI [-locallib SHARED_LIBRARY] ([deprecated]" http://git-wip-us.apache.org/repos/asf/oozie/blob/a5779d75/distro/src/main/bin/oozie-setup.sh ---------------------------------------------------------------------- diff --git a/distro/src/main/bin/oozie-setup.sh b/distro/src/main/bin/oozie-setup.sh index 663d2c9..a41e42d 100644 --- a/distro/src/main/bin/oozie-setup.sh +++ b/distro/src/main/bin/oozie-setup.sh @@ -22,13 +22,17 @@ function printUsage() { echo " Usage : oozie-setup.sh <Command and OPTIONS>" echo " prepare-war [-d directory] [-secure] (-d identifies an alternative directory for processing jars" echo " -secure will configure the war file to use HTTPS (SSL))" - echo " sharelib create -fs FS_URI [-locallib SHARED_LIBRARY] (create sharelib for oozie," + echo " sharelib create -fs FS_URI [-locallib SHARED_LIBRARY] [-concurrency CONCURRENCY]" + echo " (create sharelib for oozie," echo " FS_URI is the fs.default.name" echo " for hdfs uri; SHARED_LIBRARY, path to the" echo " Oozie sharelib to install, it can be a tarball" echo " or an expanded version of it. If ommited," echo " the Oozie sharelib tarball from the Oozie" - echo " installation directory will be used)" + echo " installation directory will be used." + echo " CONCURRENCY is a number of threads to be used" + echo " for copy operations." + echo " By default 1 thread will be used)" echo " (action failes if sharelib is already installed" echo " in HDFS)" echo " sharelib upgrade -fs FS_URI [-locallib SHARED_LIBRARY] (upgrade existing sharelib, fails if there" http://git-wip-us.apache.org/repos/asf/oozie/blob/a5779d75/docs/src/site/twiki/AG_Install.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/AG_Install.twiki b/docs/src/site/twiki/AG_Install.twiki index cd9f610..66c0019 100644 --- a/docs/src/site/twiki/AG_Install.twiki +++ b/docs/src/site/twiki/AG_Install.twiki @@ -68,13 +68,17 @@ The =oozie-setup.sh= script options are: Usage : oozie-setup.sh <OPTIONS>" prepare-war [-d directory] [-secure] (-d identifies an alternative directory for processing jars" -secure will configure the war file to use HTTPS (SSL))" - sharelib create -fs FS_URI [-locallib SHARED_LIBRARY] (create sharelib for oozie," + sharelib create -fs FS_URI [-locallib SHARED_LIBRARY] [-concurrency CONCURRENCY]" + (create sharelib for oozie," FS_URI is the fs.default.name" for hdfs uri; SHARED_LIBRARY, path to the" Oozie sharelib to install, it can be a tarball" or an expanded version of it. If omitted," the Oozie sharelib tarball from the Oozie" - installation directory will be used)" + installation directory will be used." + CONCURRENCY is a number of threads to be used" + for copy operations." + By default 1 thread will be used)" (action fails if sharelib is already installed" in HDFS)" sharelib upgrade -fs FS_URI [-locallib SHARED_LIBRARY] ([deprecated][use create command to create new version] http://git-wip-us.apache.org/repos/asf/oozie/blob/a5779d75/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index b06735c..01326c8 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2402 oozie-setup.sh sharelib create takes a long time on large clusters (yalovyyi via rkanter) OOZIE-2185 Make oozie cli source conf/oozie-client-env.sh (grimesmi via rkanter) OOZIE-2413 Kerberos credentials can expire if the KDC is slow to respond (rkanter) OOZIE-2411 Add BCC to oozie email action (fdenes via rkanter) http://git-wip-us.apache.org/repos/asf/oozie/blob/a5779d75/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java index 2d9cf3a..15d0a8b 100644 --- a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java +++ b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java @@ -21,8 +21,17 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Date; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; @@ -48,6 +57,7 @@ public class OozieSharelibCLI { public static final String UPGRADE_CMD = "upgrade"; public static final String LIB_OPT = "locallib"; public static final String FS_OPT = "fs"; + public static final String CONCURRENCY_OPT = "concurrency"; public static final String OOZIE_HOME = "oozie.home.dir"; public static final String SHARE_LIB_PREFIX = "lib_"; @@ -64,9 +74,11 @@ public class OozieSharelibCLI { protected Options createUpgradeOptions(String subCommand){ Option sharelib = new Option(LIB_OPT, true, "Local share library directory"); Option uri = new Option(FS_OPT, true, "URI of the fileSystem to " + subCommand + " oozie share library"); + Option concurrency = new Option(CONCURRENCY_OPT, true, "Number of threads to be used for copy operations. (default=1)"); Options options = new Options(); options.addOption(sharelib); options.addOption(uri); + options.addOption(concurrency); return options; } @@ -99,6 +111,7 @@ public class OozieSharelibCLI { throw new Exception("-fs option must be specified"); } + int threadPoolSize = Integer.valueOf(command.getCommandLine().getOptionValue(CONCURRENCY_OPT, "1")); File srcFile = null; //Check whether user provided locallib @@ -163,7 +176,12 @@ public class OozieSharelibCLI { throw new IOException(srcPath + " cannot be found"); } - fs.copyFromLocalFile(false, srcPath, dstPath); + if (threadPoolSize > 1) { + concurrentCopyFromLocal(fs, threadPoolSize, srcFile, dstPath); + } else { + fs.copyFromLocalFile(false, srcPath, dstPath); + } + services.destroy(); FileUtils.deleteDirectory(temp); @@ -176,22 +194,81 @@ public class OozieSharelibCLI { return 1; } catch (Exception ex) { - System.err.println(); - System.err.println("Error: " + ex.getMessage()); - System.err.println(); - System.err.println("Stack trace for the error was (for debug purposes):"); - System.err.println("--------------------------------------"); - ex.printStackTrace(System.err); - System.err.println("--------------------------------------"); - System.err.println(); + logError(ex.getMessage(), ex); return 1; } } + private void logError(String errorMessage, Throwable ex) { + System.err.println(); + System.err.println("Error: " + errorMessage); + System.err.println(); + System.err.println("Stack trace for the error was (for debug purposes):"); + System.err.println("--------------------------------------"); + ex.printStackTrace(System.err); + System.err.println("--------------------------------------"); + System.err.println(); + } + public String getTimestampDirectory() { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); Date date = new Date(); return dateFormat.format(date).toString(); } + private void concurrentCopyFromLocal(final FileSystem fs, int threadPoolSize, + File srcFile, final Path dstPath) throws IOException { + List<Future<Void>> futures = Collections.emptyList(); + ExecutorService threadPool = Executors.newFixedThreadPool(threadPoolSize); + try { + futures = copyFolderRecursively(fs, threadPool, srcFile, dstPath); + System.out.println("Running " + futures.size() + " copy tasks on " + threadPoolSize + " threads"); + } finally { + try { + threadPool.shutdown(); + } finally { + checkCopyResults(futures); + } + } + } + + private void checkCopyResults(List<Future<Void>> futures) throws IOException { + Throwable t = null; + for (Future<Void> future : futures) { + try { + future.get(); + } catch (CancellationException ce) { + t = ce; + logError("Copy task was cancelled", ce); + } catch (ExecutionException ee) { + t = ee.getCause(); + logError("Copy task failed with exception", t); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + if (t != null) { + throw new IOException ("At least one copy task failed with exception", t); + } + } + + private List<Future<Void>> copyFolderRecursively(final FileSystem fs, final ExecutorService threadPool, + File srcFile, final Path dstPath) throws IOException { + List<Future<Void>> taskList = new ArrayList<Future<Void>>(); + for (final File file : srcFile.listFiles()) { + final Path trgName = new Path(dstPath, file.getName()); + if (file.isDirectory()) { + taskList.addAll(copyFolderRecursively(fs, threadPool, file, trgName)); + } else { + taskList.add(threadPool.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + fs.copyFromLocalFile(new Path(file.toURI()), trgName); + return null; + } + })); + } + } + return taskList; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/a5779d75/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java b/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java index 829d979..7fff802 100644 --- a/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java +++ b/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java @@ -22,55 +22,51 @@ package org.apache.oozie.tools; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileWriter; -import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; import java.io.Writer; import java.net.URI; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.ServiceException; import org.apache.oozie.service.Services; import org.apache.oozie.service.ShareLibService; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.test.XTestCase; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.rules.TemporaryFolder; /** * Test OozieSharelibCLI */ public class TestOozieSharelibCLI extends XTestCase { private SecurityManager SECURITY_MANAGER; - private String outPath = "outFolder"; + private final String outPath = "outFolder"; private Services services = null; private Path dstPath = null; private FileSystem fs; + private final TemporaryFolder tmpFolder = new TemporaryFolder(); - @BeforeClass + @Override protected void setUp() throws Exception { SECURITY_MANAGER = System.getSecurityManager(); new LauncherSecurityManager(); + tmpFolder.create(); super.setUp(false); } - @AfterClass + @Override protected void tearDown() throws Exception { System.setSecurityManager(SECURITY_MANAGER); if (services != null) { services.destroy(); } + tmpFolder.delete(); super.tearDown(); - } /** @@ -83,12 +79,15 @@ public class TestOozieSharelibCLI extends XTestCase { try { String[] argsHelp = { "help" }; assertEquals(0, execOozieSharelibCLICommands(argsHelp)); - assertTrue(data.toString().contains( + String helpMessage = data.toString(); + assertTrue(helpMessage.contains( "oozie-setup.sh create <OPTIONS> : create a new timestamped version of oozie sharelib")); - assertTrue(data.toString().contains( + assertTrue(helpMessage.contains( "oozie-setup.sh upgrade <OPTIONS> : [deprecated][use command \"create\" " + "to create new version] upgrade oozie sharelib ")); - assertTrue(data.toString().contains(" oozie-setup.sh help")); + assertTrue(helpMessage.contains(" oozie-setup.sh help")); + assertTrue(helpMessage.contains( + "-concurrency <arg> Number of threads to be used for copy operations.")); } finally { System.setOut(oldPrintStream); @@ -99,17 +98,9 @@ public class TestOozieSharelibCLI extends XTestCase { /** * test copy libraries */ - public void testOozieSharelibCLI() throws Exception { - - File libDirectory = new File(getTestCaseConfDir() + File.separator + "lib"); + public void testOozieSharelibCLICreate() throws Exception { - if (!libDirectory.exists()) { - libDirectory.mkdirs(); - } - else { - FileUtil.fullyDelete(libDirectory); - libDirectory.mkdirs(); - } + File libDirectory = tmpFolder.newFolder("lib"); writeFile(libDirectory, "file1", "test File"); writeFile(libDirectory, "file2", "test File2"); @@ -129,6 +120,46 @@ public class TestOozieSharelibCLI extends XTestCase { } /** + * test parallel copy libraries + */ + public void testOozieSharelibCLICreateConcurrent() throws Exception { + + final int testFiles = 7; + final int concurrency = 5; + + File libDirectory = tmpFolder.newFolder("lib"); + + for (int i = 0; i < testFiles; i++) { + writeFile(libDirectory, generateFileName(i), generateFileContent(i)); + } + + String[] argsCreate = {"create", "-fs", outPath, "-locallib", libDirectory.getParentFile().getAbsolutePath(), + "-concurrency", String.valueOf(concurrency)}; + assertEquals(0, execOozieSharelibCLICommands(argsCreate)); + + getTargetFileSysyem(); + ShareLibService sharelibService = getServices().get(ShareLibService.class); + Path latestLibPath = sharelibService.getLatestLibPath(getDistPath(), + ShareLibService.SHARE_LIB_PREFIX); + + // test files in new folder + + for (int i = 0; i < testFiles; i++) { + String fileName = generateFileName(i); + String expectedFileContent = generateFileContent(i); + InputStream in = null; + try { + in = fs.open(new Path(latestLibPath, fileName)); + String actualFileContent = IOUtils.toString(in); + assertEquals(fileName, expectedFileContent, actualFileContent); + } finally { + IOUtils.closeQuietly(in); + } + } + + } + + /** * test fake command */ public void testFakeCommand() throws Exception { @@ -206,4 +237,11 @@ public class TestOozieSharelibCLI extends XTestCase { return 1; } + private static String generateFileName(int i) { + return "file_" + i; + } + + private static String generateFileContent(int i) { + return "test File " + i; + } }
