Repository: oozie Updated Branches: refs/heads/master a299d4a6d -> 6b89aba42
OOZIE-2791 ShareLib installation may fail on busy Hadoop clusters (asasvari, kmarton via pbacsko, andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6b89aba4 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6b89aba4 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6b89aba4 Branch: refs/heads/master Commit: 6b89aba4227326cd125a9ec409f418824cc2cada Parents: a299d4a Author: Andras Piros <[email protected]> Authored: Wed Jul 4 12:29:28 2018 +0200 Committer: Andras Piros <[email protected]> Committed: Wed Jul 4 12:29:28 2018 +0200 ---------------------------------------------------------------------- release-log.txt | 1 + .../apache/oozie/tools/OozieSharelibCLI.java | 281 +++++++++++++++---- .../tools/OozieSharelibFileOperations.java | 74 +++++ .../oozie/tools/TestBlockSizeCalculator.java | 49 ++++ .../tools/TestConcurrentCopyFromLocal.java | 121 ++++++++ .../oozie/tools/TestCopyTaskCallable.java | 145 ++++++++++ .../oozie/tools/TestOozieSharelibCLI.java | 58 ++-- 7 files changed, 640 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 4fad9a2..65628c0 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-2791 ShareLib installation may fail on busy Hadoop clusters (asasvari, kmarton via pbacsko, andras.piros) OOZIE-3297 Retry logic does not handle the exception from BulkJPAExecutor properly (andras.piros) OOZIE-2955 [oozie-client] Fix Findbugs warnings (Jan Hentschel, kmarton via andras.piros) OOZIE-3109 [log-streaming] Escape HTML-specific characters (dionusos via andras.piros) http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java index dce1c55..75e932c 100644 --- a/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java +++ b/tools/src/main/java/org/apache/oozie/tools/OozieSharelibCLI.java @@ -20,12 +20,14 @@ package org.apache.oozie.tools; import java.io.File; import java.io.IOException; import java.net.URI; +import java.nio.file.Files; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -33,19 +35,25 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.oozie.cli.CLIParser; import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.Services; import org.apache.oozie.service.WorkflowAppService; +import org.eclipse.jetty.util.ConcurrentHashSet; public class OozieSharelibCLI { public static final String[] HELP_INFO = { @@ -60,7 +68,6 @@ public class OozieSharelibCLI { public static final String CONCURRENCY_OPT = "concurrency"; public static final String OOZIE_HOME = "oozie.home.dir"; public static final String SHARE_LIB_PREFIX = "lib_"; - private boolean used; public static void main(String[] args) throws Exception{ @@ -181,7 +188,13 @@ public class OozieSharelibCLI { } if (threadPoolSize > 1) { - concurrentCopyFromLocal(fs, threadPoolSize, srcFile, dstPath); + long fsLimitsMinBlockSize = fs.getConf() + .getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT); + long bytesPerChecksum = fs.getConf() + .getLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT); + new ConcurrentCopyFromLocal(threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum) + .concurrentCopyFromLocal(fs, srcFile, dstPath); + } else { fs.copyFromLocalFile(false, srcPath, dstPath); } @@ -197,13 +210,19 @@ public class OozieSharelibCLI { System.err.println(parser.shortHelp()); return 1; } + catch (NumberFormatException ex) { + logError("Invalid configuration value: ", ex); + return 1; + } catch (Exception ex) { logError(ex.getMessage(), ex); return 1; } } - private void logError(String errorMessage, Throwable ex) { + + + private static void logError(String errorMessage, Throwable ex) { System.err.println(); System.err.println("Error: " + errorMessage); System.err.println(); @@ -220,66 +239,228 @@ public class OozieSharelibCLI { return dateFormat.format(date).toString(); } - private void concurrentCopyFromLocal(final FileSystem fs, int threadPoolSize, - File srcFile, final Path dstPath) throws IOException { - List<Future<Void>> futures = Collections.emptyList(); - ExecutorService threadPool = Executors.newFixedThreadPool(threadPoolSize); - try { - futures = copyFolderRecursively(fs, threadPool, srcFile, dstPath); - System.out.println("Running " + futures.size() + " copy tasks on " + threadPoolSize + " threads"); - } finally { + @VisibleForTesting + static final class CopyTaskConfiguration { + private final FileSystem fs; + private final File srcFile; + private final Path dstPath; + + CopyTaskConfiguration(FileSystem fs, File srcFile, Path dstPath) { + this.fs = fs; + this.srcFile = srcFile; + this.dstPath = dstPath; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CopyTaskConfiguration that = (CopyTaskConfiguration) o; + if (!srcFile.equals(that.srcFile)) { + return false; + } + return dstPath.equals(that.dstPath); + } + + @Override + public int hashCode() { + int result = srcFile.hashCode(); + result = 31 * result + dstPath.hashCode(); + return result; + } + + } + + @VisibleForTesting + static final class BlockSizeCalculator { + + protected static long getValidBlockSize (long fileLenght, long fsLimitsMinBlockSize, long bytesPerChecksum) { + if (fsLimitsMinBlockSize > fileLenght) { + return fsLimitsMinBlockSize; + } + // bytesPerChecksum must divide block size + if (fileLenght % bytesPerChecksum == 0) { + return fileLenght; + } + long ratio = fileLenght/bytesPerChecksum; + return (ratio + 1) * bytesPerChecksum; + } + } + + @VisibleForTesting + static final class CopyTaskCallable implements Callable<CopyTaskConfiguration> { + + private final static short REPLICATION_FACTOR = 3; + private final FileSystem fileSystem; + private final File file; + private final Path destinationPath; + private final Path targetName; + private final long blockSize; + + private final Set<CopyTaskConfiguration> failedCopyTasks; + + CopyTaskCallable(CopyTaskConfiguration copyTask, File file, Path trgName, long blockSize, + Set<CopyTaskConfiguration> failedCopyTasks) { + Preconditions.checkNotNull(copyTask); + Preconditions.checkNotNull(file); + Preconditions.checkNotNull(trgName); + Preconditions.checkNotNull(failedCopyTasks); + Preconditions.checkNotNull(copyTask.dstPath); + Preconditions.checkNotNull(copyTask.fs); + this.file = file; + this.destinationPath = copyTask.dstPath; + this.failedCopyTasks = failedCopyTasks; + this.fileSystem = copyTask.fs; + this.blockSize = blockSize; + this.targetName = trgName; + } + + @Override + public CopyTaskConfiguration call() throws Exception { + CopyTaskConfiguration cp = new CopyTaskConfiguration(fileSystem, file, targetName); + failedCopyTasks.add(cp); + final Path destinationFilePath = new Path(destinationPath + File.separator + file.getName()); + final boolean overwrite = true; + final int bufferSize = CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; + try (FSDataOutputStream out = fileSystem + .create(destinationFilePath, overwrite, bufferSize, REPLICATION_FACTOR, blockSize)) { + Files.copy(file.toPath(), out); + } + return cp; + } + } + + @VisibleForTesting + static final class ConcurrentCopyFromLocal { + + private static final int DEFAULT_RETRY_COUNT = 5; + private static final int STARTING_RETRY_DELAY_IN_MS = 1000; + private int retryCount; + private int retryDelayInMs; + private long fsLimitsMinBlockSize; + private long bytesPerChecksum; + + private final int threadPoolSize; + private final ExecutorService threadPool; + private final Set<CopyTaskConfiguration> failedCopyTasks = new ConcurrentHashSet<>(); + + public ConcurrentCopyFromLocal(int threadPoolSize, long fsLimitsMinBlockSize, long bytesPerChecksum) { + Preconditions.checkArgument(threadPoolSize > 0, "Thread Pool size must be greater than 0"); + Preconditions.checkArgument(fsLimitsMinBlockSize > 0, "Minimun block size must be greater than 0"); + Preconditions.checkArgument(bytesPerChecksum > 0, "Bytes per checksum must be greater than 0"); + this.bytesPerChecksum = bytesPerChecksum; + this.fsLimitsMinBlockSize = fsLimitsMinBlockSize; + this.threadPoolSize = threadPoolSize; + this.threadPool = Executors.newFixedThreadPool(threadPoolSize); + this.retryCount = DEFAULT_RETRY_COUNT; + this.retryDelayInMs = STARTING_RETRY_DELAY_IN_MS; + } + + @VisibleForTesting + void concurrentCopyFromLocal(FileSystem fs, File srcFile, Path dstPath) throws IOException { + List<Future<CopyTaskConfiguration>> futures = Collections.emptyList(); + CopyTaskConfiguration copyTask = new CopyTaskConfiguration(fs, srcFile, dstPath); try { - threadPool.shutdown(); + futures = copyFolderRecursively(copyTask); + System.out.println("Running " + futures.size() + " copy tasks on " + threadPoolSize + " threads"); } finally { checkCopyResults(futures); + System.out.println("Copy tasks are done"); + threadPool.shutdown(); } } - } - private void checkCopyResults(List<Future<Void>> futures) throws IOException { - Throwable t = null; - for (Future<Void> future : futures) { - try { - future.get(); - } catch (CancellationException ce) { - t = ce; - logError("Copy task was cancelled", ce); - } catch (ExecutionException ee) { - t = ee.getCause(); - logError("Copy task failed with exception", t); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); + private List<Future<CopyTaskConfiguration>> copyFolderRecursively(final CopyTaskConfiguration copyTask) { + List<Future<CopyTaskConfiguration>> taskList = new ArrayList<>(); + File[] fileList = copyTask.srcFile.listFiles(); + if (fileList != null) { + for (final File file : fileList) { + final Path trgName = new Path(copyTask.dstPath, file.getName()); + if (file.isDirectory()) { + taskList.addAll(copyFolderRecursively( + new CopyTaskConfiguration(copyTask.fs, file, trgName))); + } else { + final long blockSize = BlockSizeCalculator + .getValidBlockSize(file.length(), fsLimitsMinBlockSize, bytesPerChecksum); + taskList.add(threadPool + .submit(new CopyTaskCallable(copyTask, file, trgName, blockSize, failedCopyTasks))); + } + } } + return taskList; } - if (t != null) { - throw new IOException ("At least one copy task failed with exception", t); + + private void checkCopyResults(final List<Future<CopyTaskConfiguration>> futures) + throws IOException { + boolean exceptionOccurred = false; + for (Future<CopyTaskConfiguration> future : futures) { + CopyTaskConfiguration cp; + try { + cp = future.get(); + if (cp != null) { + failedCopyTasks.remove(cp); + } + } catch (CancellationException ce) { + exceptionOccurred = true; + logError("Copy task was cancelled", ce); + } catch (ExecutionException ee) { + exceptionOccurred = true; + logError("Copy task failed with exception", ee.getCause()); + } catch (InterruptedException ie) { + exceptionOccurred = true; + Thread.currentThread().interrupt(); + } + } + if (exceptionOccurred) { + System.err.println("At least one copy task failed with exception. Retrying failed copy tasks."); + retryFailedCopyTasks(); + + if (!failedCopyTasks.isEmpty() && retryCount == 0) { + throw new IOException("At least one copy task failed with exception"); + } + } } - } - private List<Future<Void>> copyFolderRecursively(final FileSystem fs, final ExecutorService threadPool, - File srcFile, final Path dstPath) throws IOException { - List<Future<Void>> taskList = new ArrayList<Future<Void>>(); - File[] files = srcFile.listFiles(); - - if (files != null) { - for (final File file : files) { - final Path trgName = new Path(dstPath, file.getName()); - if (file.isDirectory()) { - taskList.addAll(copyFolderRecursively(fs, threadPool, file, trgName)); - } else { - taskList.add(threadPool.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - fs.copyFromLocalFile(new Path(file.toURI()), trgName); - return null; - } - })); + private void retryFailedCopyTasks() throws IOException { + + while (retryCount > 0 && !failedCopyTasks.isEmpty()) { + try { + System.err.println("Waiting " + retryDelayInMs + " ms before retrying failed copy tasks."); + Thread.sleep(retryDelayInMs); + retryDelayInMs = retryDelayInMs * 2; + } catch (InterruptedException e) { + System.err.println(e.getMessage()); } + + for (CopyTaskConfiguration cp : failedCopyTasks) { + System.err.println("Retrying to copy " + cp.srcFile + " to " + cp.dstPath); + try { + copyFromLocalFile(cp); + failedCopyTasks.remove(cp); + } + catch (IOException e) { + System.err.printf("Copying [%s] to [%s] failed with exception: [%s]%n. Proceed to next file.%n" + ,cp.srcFile, cp.dstPath, e.getMessage()); + } + } + + --retryCount; + } + + if (!failedCopyTasks.isEmpty() && retryCount == 0) { + throw new IOException("Could not install Oozie ShareLib properly."); } - } else { - System.out.println("WARNING: directory listing of " + srcFile.getAbsolutePath().toString() + " returned null"); } - return taskList; + private void copyFromLocalFile(CopyTaskConfiguration cp) throws IOException{ + final FileSystem fs = cp.fs; + fs.delete(cp.dstPath, false); + fs.copyFromLocalFile(false, new Path(cp.srcFile.toURI()), cp.dstPath); + } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/OozieSharelibFileOperations.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/oozie/tools/OozieSharelibFileOperations.java b/tools/src/test/java/org/apache/oozie/tools/OozieSharelibFileOperations.java new file mode 100644 index 0000000..d344300 --- /dev/null +++ b/tools/src/test/java/org/apache/oozie/tools/OozieSharelibFileOperations.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.tools; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.Writer; +import java.util.List; + +public final class OozieSharelibFileOperations { + + /** + * Suppress default constructor for noninstantiability + */ + private OozieSharelibFileOperations() { + throw new UnsupportedOperationException(); + } + + /** + * generate a number of files equals with fileNr, and save the fileList parameter + * @param fileNr number of files to be generated + * @param fileList a list of the generated files + * @throws Exception + */ + public static void generateAndWriteFiles(File libDirectory, int fileNr, List<File> fileList) throws IOException { + for (int i=0; i<fileNr; i++) { + String fileName = generateFileName(i); + String fileContent = generateFileContent(i); + fileList.add(writeFile(libDirectory, fileName, fileContent)); + } + } + + /** + * Create a file in a specified folder, with a specific name and content + * @param folder source folder + * @param filename name of the generated file + * @param content content of the generated file + * @return the created file + * @throws Exception + */ + public static File writeFile(File folder, String filename, String content) throws IOException { + File file = new File(folder.getAbsolutePath() + File.separator + filename); + Writer writer = new FileWriter(file); + writer.write(content); + writer.flush(); + writer.close(); + return file; + } + + public static String generateFileName(int i) { + return "file_" + i; + } + + public static String generateFileContent(int i) { + return "test File " + i; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/TestBlockSizeCalculator.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/oozie/tools/TestBlockSizeCalculator.java b/tools/src/test/java/org/apache/oozie/tools/TestBlockSizeCalculator.java new file mode 100644 index 0000000..b4a668e --- /dev/null +++ b/tools/src/test/java/org/apache/oozie/tools/TestBlockSizeCalculator.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.tools; + +import junit.framework.TestCase; +import org.junit.Assert; + +public class TestBlockSizeCalculator extends TestCase{ + + private long minBlockSize = 1048576; + private long bytesPerChecksum = 512; + + public void testGetValidBlockSizeWhenFileLengthLowerThanMinBlockSize() { + long fileLength = 615100; + long validBlockSize = OozieSharelibCLI.BlockSizeCalculator.getValidBlockSize(fileLength, minBlockSize, bytesPerChecksum); + Assert.assertEquals("The block size should be equal to the defined min block size", minBlockSize, validBlockSize); + } + + public void testGetValidBlockSizeWhenBytesPerChecksumDoesNotDivideFileLength() { + long fileLength = 1048577; + long expectedBlockSize = (fileLength / bytesPerChecksum + 1) * bytesPerChecksum; + long validBlockSize = OozieSharelibCLI.BlockSizeCalculator.getValidBlockSize(fileLength, minBlockSize, bytesPerChecksum); + Assert.assertEquals("The block size should be the first greater value than the file size, dividable by bytes per checksum", + expectedBlockSize, validBlockSize); + } + + public void testGetValidBlockSizeWhenBytesPerChecksumDivideFileLength() { + long fileLength = 1049088; + long validBlockSize = OozieSharelibCLI.BlockSizeCalculator.getValidBlockSize(fileLength, minBlockSize, bytesPerChecksum); + Assert.assertEquals("The block size should be equal with the file length", fileLength, validBlockSize); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/TestConcurrentCopyFromLocal.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/oozie/tools/TestConcurrentCopyFromLocal.java b/tools/src/test/java/org/apache/oozie/tools/TestConcurrentCopyFromLocal.java new file mode 100644 index 0000000..d77eba6 --- /dev/null +++ b/tools/src/test/java/org/apache/oozie/tools/TestConcurrentCopyFromLocal.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.tools; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.test.XTestCase; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +public class TestConcurrentCopyFromLocal extends XTestCase { + + private final String outPath = "outFolder"; + private final TemporaryFolder tmpFolder = new TemporaryFolder(); + private File libDirectory; + private Services services = null; + private Path dstPath = null; + private FileSystem fs; + + @Override + protected void setUp() throws Exception { + super.setUp(false); + tmpFolder.create(); + libDirectory = tmpFolder.newFolder("lib"); + services = new Services(); + services.get(ConfigurationService.class).getConf() + .set(Services.CONF_SERVICE_CLASSES,"org.apache.oozie.service.LiteWorkflowAppService," + + "org.apache.oozie.service.SchedulerService," + + "org.apache.oozie.service.HadoopAccessorService," + + "org.apache.oozie.service.ShareLibService"); + services.init(); + + HadoopAccessorService has = services.get(HadoopAccessorService.class); + URI uri = new Path(outPath).toUri(); + Configuration fsConf = has.createConfiguration(uri.getAuthority()); + fs = has.createFileSystem(System.getProperty("user.name"), uri, fsConf); + + WorkflowAppService lwas = services.get(WorkflowAppService.class); + dstPath = lwas.getSystemLibPath(); + } + + @Override + protected void tearDown() throws Exception { + tmpFolder.delete(); + services.destroy(); + super.tearDown(); + } + + public void testConcurrentCopyFromLocalSameFileNrAndThreadNr() throws Exception { + final int testFiles = 15; + final int threadPoolSize = 15; + final long fsLimitsMinBlockSize = 1048576; + final long bytesPerChecksum = 512; + performAndCheckConcurrentCopy(testFiles, threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum); + } + + public void testConcurrentCopyFromLocalMoreThreadsThanFiles() throws Exception { + final int testFiles = 15; + final int threadPoolSize = 35; + final long fsLimitsMinBlockSize = 1048576; + final long bytesPerChecksum = 512; + performAndCheckConcurrentCopy(testFiles, threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum); + } + + public void testConcurrentCopyFromLocalHighThreadNr() throws Exception { + final int testFiles = 200; + final int threadPoolSize = 150; + final long fsLimitsMinBlockSize = 1048576; + final long bytesPerChecksum = 512; + performAndCheckConcurrentCopy(testFiles, threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum); + } + + private void performAndCheckConcurrentCopy(final int testFiles, final int threadPoolSize, final long fsLimitsMinBlockSize, + final long bytesPerChecksum) throws Exception { + List<File> fileList = new ArrayList<>(); + + OozieSharelibFileOperations.generateAndWriteFiles(libDirectory, testFiles, fileList); + File srcFile = new File(libDirectory.getParentFile().getAbsolutePath()); + OozieSharelibCLI.ConcurrentCopyFromLocal concurrentCopy = new OozieSharelibCLI + .ConcurrentCopyFromLocal(threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum); + concurrentCopy.concurrentCopyFromLocal(fs, srcFile, dstPath); + + for (int i = 0; i < testFiles; i++) { + try ( + InputStream originalFileStream = new FileInputStream(fileList.get(i)); + InputStream copiedFileStream = fs.open(new Path(dstPath + File.separator + "lib", + fileList.get(i).getName()))){ + + assertTrue("The content of the files must be equal", IOUtils.contentEquals(originalFileStream, copiedFileStream)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/TestCopyTaskCallable.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/oozie/tools/TestCopyTaskCallable.java b/tools/src/test/java/org/apache/oozie/tools/TestCopyTaskCallable.java new file mode 100644 index 0000000..bce0433 --- /dev/null +++ b/tools/src/test/java/org/apache/oozie/tools/TestCopyTaskCallable.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.oozie.tools; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.test.XTestCase; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + + +public class TestCopyTaskCallable extends XTestCase { + private final String outPath = "outFolder"; + private final TemporaryFolder tmpFolder = new TemporaryFolder(); + private File libDirectory; + private Services services = null; + private Path dstPath = null; + private FileSystem fs; + + @Override + protected void setUp() throws Exception { + super.setUp(false); + tmpFolder.create(); + libDirectory = tmpFolder.newFolder("lib"); + services = new Services(); + services.getConf() + .set(Services.CONF_SERVICE_CLASSES,"org.apache.oozie.service.LiteWorkflowAppService," + + "org.apache.oozie.service.SchedulerService," + + "org.apache.oozie.service.HadoopAccessorService," + + "org.apache.oozie.service.ShareLibService"); + services.init(); + + HadoopAccessorService has = services.get(HadoopAccessorService.class); + URI uri = new Path(outPath).toUri(); + Configuration fsConf = has.createConfiguration(uri.getAuthority()); + fs = has.createFileSystem(System.getProperty("user.name"), uri, fsConf); + + WorkflowAppService lwas = services.get(WorkflowAppService.class); + dstPath = lwas.getSystemLibPath(); + } + + @Override + protected void tearDown() throws Exception { + tmpFolder.delete(); + services.destroy(); + super.tearDown(); + } + + public void testCallCopyTaskSameFileNrAndThreadNr() throws Exception { + final long blockSize = 1048576; + final int testFiles = 150; + final int poolSize = 150; + performAndCheckCallCopyTask(blockSize, poolSize, testFiles); + } + + public void testCallCopyTaskOneThread() throws Exception { + final long blockSize = 1048576; + final int testFiles = 15; + final int poolSize = 1; + performAndCheckCallCopyTask(blockSize, poolSize, testFiles); + } + + public void testCallCopyTaskMoreFilesThanThreads() throws Exception { + final long blockSize = 1048576; + final int testFiles = 150; + final int poolSize = 10; + performAndCheckCallCopyTask(blockSize, poolSize, testFiles); + } + + public void testCallCopyTaskMoreThreadsThanFiles() throws Exception { + final long blockSize = 1048576; + final int testFiles = 15; + final int poolSize = 20; + performAndCheckCallCopyTask(blockSize, poolSize, testFiles); + } + + private void performAndCheckCallCopyTask(final long blockSize, final int poolSize, final int testFiles) throws Exception { + Set<OozieSharelibCLI.CopyTaskConfiguration> failedCopyTasks = new ConcurrentHashSet<>(); + + List<File> fileList = new ArrayList<>(); + OozieSharelibFileOperations.generateAndWriteFiles(libDirectory, testFiles, fileList); + + File srcFile = new File(libDirectory.getParentFile().getAbsolutePath()); + + OozieSharelibCLI.CopyTaskConfiguration copyTask = + new OozieSharelibCLI.CopyTaskConfiguration(fs, srcFile, dstPath); + List<Future<OozieSharelibCLI.CopyTaskConfiguration>> taskList = new ArrayList<>(); + + final ExecutorService threadPool = Executors.newFixedThreadPool(poolSize); + try { + for (final File file : libDirectory.listFiles()) { + final Path trgName = new Path(dstPath, file.getName()); + taskList.add(threadPool + .submit(new OozieSharelibCLI.CopyTaskCallable(copyTask, file, trgName, blockSize, failedCopyTasks))); + } + for (Future<OozieSharelibCLI.CopyTaskConfiguration> future : taskList) { + future.get(); + } + } finally { + threadPool.shutdown(); + } + + for (int i = 0; i < testFiles; i++) { + + try ( + InputStream originalFileStream = new FileInputStream(fileList.get(i)); + InputStream copiedFileStream = fs.open(new Path(dstPath, fileList.get(i).getName()))){ + assertTrue("The content of the files must be equal", IOUtils.contentEquals(originalFileStream, copiedFileStream)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/6b89aba4/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java ---------------------------------------------------------------------- diff --git a/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java b/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java index f53d987..5929e5c 100644 --- a/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java +++ b/tools/src/test/java/org/apache/oozie/tools/TestOozieSharelibCLI.java @@ -19,19 +19,12 @@ package org.apache.oozie.tools; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileWriter; -import java.io.InputStream; -import java.io.PrintStream; -import java.io.Writer; -import java.net.URI; import org.apache.commons.io.IOUtils; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.oozie.action.hadoop.security.LauncherSecurityManager; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.ServiceException; import org.apache.oozie.service.Services; @@ -40,21 +33,29 @@ import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.test.XTestCase; import org.junit.rules.TemporaryFolder; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.InputStream; +import java.io.PrintStream; +import java.net.URI; + /** * Test OozieSharelibCLI */ public class TestOozieSharelibCLI extends XTestCase { - private final String outPath = "outFolder"; + public final String outPath = "outFolder"; + private final TemporaryFolder tmpFolder = new TemporaryFolder(); + private File libDirectory; private Services services = null; private Path dstPath = null; private FileSystem fs; - private final TemporaryFolder tmpFolder = new TemporaryFolder(); private LauncherSecurityManager launcherSecurityManager; @Override protected void setUp() throws Exception { launcherSecurityManager = new LauncherSecurityManager(); launcherSecurityManager.enable(); tmpFolder.create(); + libDirectory = tmpFolder.newFolder("lib"); super.setUp(false); } @@ -65,7 +66,6 @@ public class TestOozieSharelibCLI extends XTestCase { if (services != null) { services.destroy(); } - tmpFolder.delete(); super.tearDown(); } @@ -100,10 +100,8 @@ public class TestOozieSharelibCLI extends XTestCase { */ public void testOozieSharelibCLICreate() throws Exception { - File libDirectory = tmpFolder.newFolder("lib"); - - writeFile(libDirectory, "file1", "test File"); - writeFile(libDirectory, "file2", "test File2"); + OozieSharelibFileOperations.writeFile(libDirectory, "file1", "test File"); + OozieSharelibFileOperations.writeFile(libDirectory, "file2", "test File2"); String[] argsCreate = { "create", "-fs", outPath, "-locallib", libDirectory.getParentFile().getAbsolutePath() }; assertEquals(0, execOozieSharelibCLICommands(argsCreate)); @@ -127,10 +125,9 @@ public class TestOozieSharelibCLI extends XTestCase { final int testFiles = 7; final int concurrency = 5; - File libDirectory = tmpFolder.newFolder("lib"); - for (int i = 0; i < testFiles; i++) { - writeFile(libDirectory, generateFileName(i), generateFileContent(i)); + OozieSharelibFileOperations.writeFile(libDirectory, OozieSharelibFileOperations.generateFileName(i), + OozieSharelibFileOperations.generateFileContent(i)); } String[] argsCreate = {"create", "-fs", outPath, "-locallib", libDirectory.getParentFile().getAbsolutePath(), @@ -145,11 +142,11 @@ public class TestOozieSharelibCLI extends XTestCase { // test files in new folder for (int i = 0; i < testFiles; i++) { - String fileName = generateFileName(i); - String expectedFileContent = generateFileContent(i); + String fileName = OozieSharelibFileOperations.generateFileName(i); + String expectedFileContent = OozieSharelibFileOperations.generateFileContent(i); InputStream in = null; try { - in = fs.open(new Path(latestLibPath, fileName)); + in = getTargetFileSysyem().open(new Path(latestLibPath, fileName)); String actualFileContent = IOUtils.toString(in); assertEquals(fileName, expectedFileContent, actualFileContent); } finally { @@ -193,7 +190,7 @@ public class TestOozieSharelibCLI extends XTestCase { private Services getServices() throws ServiceException { if (services == null) { services = new Services(); - services.getConf() + services.get(ConfigurationService.class).getConf() .set(Services.CONF_SERVICE_CLASSES,"org.apache.oozie.service.LiteWorkflowAppService," + "org.apache.oozie.service.SchedulerService," + "org.apache.oozie.service.HadoopAccessorService," @@ -211,15 +208,6 @@ public class TestOozieSharelibCLI extends XTestCase { return dstPath; } - private void writeFile(File folder, String filename, String content) throws Exception { - File file = new File(folder.getAbsolutePath() + File.separator + filename); - Writer writer = new FileWriter(file); - writer.write(content); - writer.flush(); - writer.close(); - - } - private int execOozieSharelibCLICommands(String[] args) throws Exception { try { OozieSharelibCLI.main(args); @@ -236,12 +224,4 @@ public class TestOozieSharelibCLI extends XTestCase { } return 1; } - - private static String generateFileName(int i) { - return "file_" + i; - } - - private static String generateFileContent(int i) { - return "test File " + i; - } }
