Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/tools/distcp2/TestIntegration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/tools/distcp2/TestIntegration.java?rev=1495297&view=auto ============================================================================== --- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/tools/distcp2/TestIntegration.java (added) +++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/tools/distcp2/TestIntegration.java Fri Jun 21 06:37:27 2013 @@ -0,0 +1,634 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.distcp2; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapreduce.JobSubmissionFiles; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.tools.distcp2.util.TestDistCpUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestIntegration { + private static final Log LOG = LogFactory.getLog(TestIntegration.class); + + private static MiniDFSCluster miniDfsCluster = null; + private static MiniMRCluster miniMrCluster = null; + + private static FileSystem fs; + + private static Path listFile; + private static Path target; + private static String root; + + private static JobConf getConf() { + return miniMrCluster.createJobConf(); + } + + @BeforeClass + public static void setup() { + try { + miniDfsCluster = new MiniDFSCluster(new Configuration(), 3, true, null); + fs = miniDfsCluster.getFileSystem(); + final String namenode = fs.getUri().toString(); + miniMrCluster = new MiniMRCluster(3, namenode, 1); + + listFile = new Path("target/tmp/listing").makeQualified(fs.getUri(), + fs.getWorkingDirectory()); + target = new Path("target/tmp/target").makeQualified(fs.getUri(), + fs.getWorkingDirectory()); + root = new Path("target/tmp").makeQualified(fs.getUri(), + fs.getWorkingDirectory()).toString(); + TestDistCpUtils.delete(fs, root); + } catch (IOException e) { + LOG.error("Exception encountered ", e); + } + } + + @AfterClass + public static void shutdown() throws IOException { + miniMrCluster.shutdown(); + fs.close(); + miniDfsCluster.shutdown(); + } + + @Test(timeout=100000) + public void testSingleFileMissingTarget() { + caseSingleFileMissingTarget(false); + caseSingleFileMissingTarget(true); + } + + private void caseSingleFileMissingTarget(boolean sync) { + + try { + addEntries(listFile, "singlefile1/file1"); + createFiles("singlefile1/file1"); + + runTest(listFile, target, sync); + + checkResult(target, 1); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test(timeout=100000) + public void testSingleFileTargetFile() { + caseSingleFileTargetFile(false); + caseSingleFileTargetFile(true); + } + + private void caseSingleFileTargetFile(boolean sync) { + + try { + addEntries(listFile, "singlefile1/file1"); + createFiles("singlefile1/file1", "target"); + + runTest(listFile, target, sync); + + checkResult(target, 1); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test(timeout=100000) + public void testSingleFileTargetDir() { + caseSingleFileTargetDir(false); + caseSingleFileTargetDir(true); + } + + private void caseSingleFileTargetDir(boolean sync) { + + try { + addEntries(listFile, "singlefile2/file2"); + createFiles("singlefile2/file2"); + mkdirs(target.toString()); + + runTest(listFile, target, sync); + + checkResult(target, 1, "file2"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test(timeout=100000) + public void testSingleDirTargetMissing() { + caseSingleDirTargetMissing(false); + caseSingleDirTargetMissing(true); + } + + private void caseSingleDirTargetMissing(boolean sync) { + + try { + addEntries(listFile, "singledir"); + mkdirs(root + "/singledir/dir1"); + + runTest(listFile, target, sync); + + checkResult(target, 1, "dir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test(timeout=100000) + public void testSingleDirTargetPresent() { + + try { + addEntries(listFile, "singledir"); + mkdirs(root + "/singledir/dir1"); + mkdirs(target.toString()); + + runTest(listFile, target, false); + + checkResult(target, 1, "singledir/dir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test(timeout=100000) + public void testUpdateSingleDirTargetPresent() { + + try { + addEntries(listFile, "Usingledir"); + mkdirs(root + "/Usingledir/Udir1"); + mkdirs(target.toString()); + + runTest(listFile, target, true); + + checkResult(target, 1, "Udir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test(timeout=100000) + public void testMultiFileTargetPresent() { + caseMultiFileTargetPresent(false); + caseMultiFileTargetPresent(true); + } + + private void caseMultiFileTargetPresent(boolean sync) { + + try { + addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + mkdirs(target.toString()); + + runTest(listFile, target, sync); + + checkResult(target, 3, "file3", "file4", "file5"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test(timeout=100000) + public void testCustomCopyListing() { + + try { + addEntries(listFile, "multifile1/file3", "multifile1/file4", "multifile1/file5"); + createFiles("multifile1/file3", "multifile1/file4", "multifile1/file5"); + mkdirs(target.toString()); + + Configuration conf = getConf(); + try { + conf.setClass(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS, + CustomCopyListing.class, CopyListing.class); + DistCpOptions options = new DistCpOptions(Arrays. + asList(new Path(root + "/" + "multifile1")), target); + options.setSyncFolder(true); + options.setDeleteMissing(false); + options.setOverwrite(false); + try { + new DistCp(conf, options).execute(); + } catch (Exception e) { + LOG.error("Exception encountered ", e); + throw new IOException(e); + } + } finally { + conf.unset(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS); + } + + checkResult(target, 2, "file4", "file5"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + private static class CustomCopyListing extends SimpleCopyListing { + + public CustomCopyListing(Configuration configuration, + Credentials credentials) { + super(configuration, credentials); + } + + @Override + protected boolean shouldCopy(Path path, DistCpOptions options) { + return !path.getName().equals("file3"); + } + } + + @Test(timeout=100000) + public void testMultiFileTargetMissing() { + caseMultiFileTargetMissing(false); + caseMultiFileTargetMissing(true); + } + + private void caseMultiFileTargetMissing(boolean sync) { + + try { + addEntries(listFile, "multifile/file3", "multifile/file4", "multifile/file5"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + + runTest(listFile, target, sync); + + checkResult(target, 3, "file3", "file4", "file5"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test(timeout=100000) + public void testMultiDirTargetPresent() { + + try { + addEntries(listFile, "multifile", "singledir"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + mkdirs(target.toString(), root + "/singledir/dir1"); + + runTest(listFile, target, false); + + checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5", "singledir/dir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test(timeout=100000) + public void testUpdateMultiDirTargetPresent() { + + try { + addEntries(listFile, "Umultifile", "Usingledir"); + createFiles("Umultifile/Ufile3", "Umultifile/Ufile4", "Umultifile/Ufile5"); + mkdirs(target.toString(), root + "/Usingledir/Udir1"); + + runTest(listFile, target, true); + + checkResult(target, 4, "Ufile3", "Ufile4", "Ufile5", "Udir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test(timeout=100000) + public void testMultiDirTargetMissing() { + + try { + addEntries(listFile, "multifile", "singledir"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + mkdirs(root + "/singledir/dir1"); + + runTest(listFile, target, false); + + checkResult(target, 2, "multifile/file3", "multifile/file4", + "multifile/file5", "singledir/dir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test(timeout=100000) + public void testUpdateMultiDirTargetMissing() { + + try { + addEntries(listFile, "multifile", "singledir"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + mkdirs(root + "/singledir/dir1"); + + runTest(listFile, target, true); + + checkResult(target, 4, "file3", "file4", "file5", "dir1"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + } + } + + @Test(timeout=100000) + public void testDeleteMissingInDestination() { + + try { + addEntries(listFile, "srcdir"); + createFiles("srcdir/file1", "dstdir/file1", "dstdir/file2"); + + Path target = new Path(root + "/dstdir"); + runTest(listFile, target, true, true, false); + + checkResult(target, 1, "file1"); + } catch (IOException e) { + LOG.error("Exception encountered while running distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + TestDistCpUtils.delete(fs, "target/tmp1"); + } + } + + @Test(timeout=100000) + public void testOverwrite() { + byte[] contents1 = "contents1".getBytes(); + byte[] contents2 = "contents2".getBytes(); + Assert.assertEquals(contents1.length, contents2.length); + + try { + addEntries(listFile, "srcdir"); + createWithContents("srcdir/file1", contents1); + createWithContents("dstdir/file1", contents2); + + Path target = new Path(root + "/dstdir"); + runTest(listFile, target, false, false, true); + + checkResult(target, 1, "file1"); + + // make sure dstdir/file1 has been overwritten with the contents + // of srcdir/file1 + FSDataInputStream is = fs.open(new Path(root + "/dstdir/file1")); + byte[] dstContents = new byte[contents1.length]; + is.readFully(dstContents); + is.close(); + Assert.assertArrayEquals(contents1, dstContents); + } catch (IOException e) { + LOG.error("Exception encountered while running distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + TestDistCpUtils.delete(fs, "target/tmp1"); + } + } + + @Test(timeout=100000) + public void testGlobTargetMissingSingleLevel() { + + try { + Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(), + fs.getWorkingDirectory()); + addEntries(listFile, "*"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + createFiles("singledir/dir2/file6"); + + runTest(listFile, target, false); + + checkResult(target, 2, "multifile/file3", "multifile/file4", "multifile/file5", + "singledir/dir2/file6"); + } catch (IOException e) { + LOG.error("Exception encountered while testing distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + TestDistCpUtils.delete(fs, "target/tmp1"); + } + } + + @Test(timeout=100000) + public void testUpdateGlobTargetMissingSingleLevel() { + + try { + Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(), + fs.getWorkingDirectory()); + addEntries(listFile, "*"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + createFiles("singledir/dir2/file6"); + + runTest(listFile, target, true); + + checkResult(target, 4, "file3", "file4", "file5", "dir2/file6"); + } catch (IOException e) { + LOG.error("Exception encountered while running distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + TestDistCpUtils.delete(fs, "target/tmp1"); + } + } + + @Test(timeout=100000) + public void testGlobTargetMissingMultiLevel() { + + try { + Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(), + fs.getWorkingDirectory()); + addEntries(listFile, "*/*"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + createFiles("singledir1/dir3/file7", "singledir1/dir3/file8", + "singledir1/dir3/file9"); + + runTest(listFile, target, false); + + checkResult(target, 4, "file3", "file4", "file5", + "dir3/file7", "dir3/file8", "dir3/file9"); + } catch (IOException e) { + LOG.error("Exception encountered while running distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + TestDistCpUtils.delete(fs, "target/tmp1"); + } + } + + @Test(timeout=100000) + public void testUpdateGlobTargetMissingMultiLevel() { + + try { + Path listFile = new Path("target/tmp1/listing").makeQualified(fs.getUri(), + fs.getWorkingDirectory()); + addEntries(listFile, "*/*"); + createFiles("multifile/file3", "multifile/file4", "multifile/file5"); + createFiles("singledir1/dir3/file7", "singledir1/dir3/file8", + "singledir1/dir3/file9"); + + runTest(listFile, target, true); + + checkResult(target, 6, "file3", "file4", "file5", + "file7", "file8", "file9"); + } catch (IOException e) { + LOG.error("Exception encountered while running distcp", e); + Assert.fail("distcp failure"); + } finally { + TestDistCpUtils.delete(fs, root); + TestDistCpUtils.delete(fs, "target/tmp1"); + } + } + + @Test(timeout=100000) + public void testCleanup() { + try { + Path sourcePath = new Path("noscheme:///file"); + List<Path> sources = new ArrayList<Path>(); + sources.add(sourcePath); + + DistCpOptions options = new DistCpOptions(sources, target); + + JobConf conf = getConf(); + Path stagingDir = JobSubmissionFiles.getStagingDir( + new JobClient(conf), conf); + stagingDir.getFileSystem(conf).mkdirs(stagingDir); + + try { + new DistCp(conf, options).execute(); + } catch (Throwable t) { + Assert.assertEquals(stagingDir.getFileSystem(conf). + listStatus(stagingDir).length, 0); + } + } catch (Exception e) { + LOG.error("Exception encountered ", e); + Assert.fail("testCleanup failed " + e.getMessage()); + } + } + + private void addEntries(Path listFile, String... entries) throws IOException { + OutputStream out = fs.create(listFile); + try { + for (String entry : entries){ + out.write((root + "/" + entry).getBytes()); + out.write("\n".getBytes()); + } + } finally { + out.close(); + } + } + + private void createFiles(String... entries) throws IOException { + for (String entry : entries){ + if (!entry.startsWith("hdfs://")) { + entry = root + "/" + entry; + } + OutputStream out = fs.create(new Path(entry)); + try { + out.write((root + "/" + entry).getBytes()); + out.write("\n".getBytes()); + } finally { + out.close(); + } + } + } + + private void createWithContents(String entry, byte[] contents) throws IOException { + OutputStream out = fs.create(new Path(root + "/" + entry)); + try { + out.write(contents); + } finally { + out.close(); + } + } + + private void mkdirs(String... entries) throws IOException { + for (String entry : entries){ + fs.mkdirs(new Path(entry)); + } + } + + private void runTest(Path listFile, Path target, boolean sync) throws IOException { + runTest(listFile, target, sync, false, false); + } + + private void runTest(Path listFile, Path target, boolean sync, boolean delete, + boolean overwrite) throws IOException { + DistCpOptions options = new DistCpOptions(listFile, target); + options.setSyncFolder(sync); + options.setDeleteMissing(delete); + options.setOverwrite(overwrite); + try { + new DistCp(getConf(), options).execute(); + } catch (Exception e) { + LOG.error("Exception encountered ", e); + throw new IOException(e); + } + } + + private void checkResult(Path target, int count, String... relPaths) throws IOException { + Assert.assertEquals(count, fs.listStatus(target).length); + if (relPaths == null || relPaths.length == 0) { + Assert.assertTrue(target.toString(), fs.exists(target)); + return; + } + for (String relPath : relPaths) { + Assert.assertTrue(new Path(target, relPath).toString(), fs.exists(new Path(target, relPath))); + } + } + +}
Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/tools/distcp2/TestOptionsParser.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/tools/distcp2/TestOptionsParser.java?rev=1495297&view=auto ============================================================================== --- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/tools/distcp2/TestOptionsParser.java (added) +++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/tools/distcp2/TestOptionsParser.java Fri Jun 21 06:37:27 2013 @@ -0,0 +1,522 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.distcp2; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute; +import org.junit.Assert; +import org.junit.Test; + +public class TestOptionsParser { + + @Test + public void testParseIgnoreFailure() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertFalse(options.shouldIgnoreFailures()); + + options = OptionsParser.parse(new String[] { + "-i", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldIgnoreFailures()); + } + + @Test + public void testParseOverwrite() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertFalse(options.shouldOverwrite()); + + options = OptionsParser.parse(new String[] { + "-overwrite", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldOverwrite()); + + try { + OptionsParser.parse(new String[] { + "-update", + "-overwrite", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.fail("Update and overwrite aren't allowed together"); + } catch (IllegalArgumentException ignore) { + } + } + + @Test + public void testLogPath() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertNull(options.getLogPath()); + + options = OptionsParser.parse(new String[] { + "-log", + "hdfs://localhost:8020/logs", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getLogPath(), new Path("hdfs://localhost:8020/logs")); + } + + @Test + public void testParseBlokcing() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldBlock()); + + options = OptionsParser.parse(new String[] { + "-async", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertFalse(options.shouldBlock()); + } + + @Test + public void testParsebandwidth() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB); + + options = OptionsParser.parse(new String[] { + "-bandwidth", + "11", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getMapBandwidth(), 11); + } + + @Test(expected=IllegalArgumentException.class) + public void testParseNonPositiveBandwidth() { + OptionsParser.parse(new String[] { + "-bandwidth", + "-11", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + } + + @Test(expected=IllegalArgumentException.class) + public void testParseZeroBandwidth() { + OptionsParser.parse(new String[] { + "-bandwidth", + "0", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + } + + @Test + public void testParseSkipCRC() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertFalse(options.shouldSkipCRC()); + + options = OptionsParser.parse(new String[] { + "-update", + "-skipcrccheck", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldSyncFolder()); + Assert.assertTrue(options.shouldSkipCRC()); + } + + @Test + public void testParseAtomicCommit() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertFalse(options.shouldAtomicCommit()); + + options = OptionsParser.parse(new String[] { + "-atomic", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldAtomicCommit()); + + try { + OptionsParser.parse(new String[] { + "-atomic", + "-update", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.fail("Atomic and sync folders were allowed"); + } catch (IllegalArgumentException ignore) { } + } + + @Test + public void testParseWorkPath() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertNull(options.getAtomicWorkPath()); + + options = OptionsParser.parse(new String[] { + "-atomic", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertNull(options.getAtomicWorkPath()); + + options = OptionsParser.parse(new String[] { + "-atomic", + "-tmp", + "hdfs://localhost:8020/work", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getAtomicWorkPath(), new Path("hdfs://localhost:8020/work")); + + try { + OptionsParser.parse(new String[] { + "-tmp", + "hdfs://localhost:8020/work", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.fail("work path was allowed without -atomic switch"); + } catch (IllegalArgumentException ignore) {} + } + + @Test + public void testParseSyncFolders() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertFalse(options.shouldSyncFolder()); + + options = OptionsParser.parse(new String[] { + "-update", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldSyncFolder()); + } + + @Test + public void testParseDeleteMissing() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertFalse(options.shouldDeleteMissing()); + + options = OptionsParser.parse(new String[] { + "-update", + "-delete", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldSyncFolder()); + Assert.assertTrue(options.shouldDeleteMissing()); + + options = OptionsParser.parse(new String[] { + "-overwrite", + "-delete", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldOverwrite()); + Assert.assertTrue(options.shouldDeleteMissing()); + + try { + OptionsParser.parse(new String[] { + "-atomic", + "-delete", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.fail("Atomic and delete folders were allowed"); + } catch (IllegalArgumentException ignore) { } + } + + @Test + public void testParseSSLConf() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertNull(options.getSslConfigurationFile()); + + options = OptionsParser.parse(new String[] { + "-mapredSslConf", + "/tmp/ssl-client.xml", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getSslConfigurationFile(), "/tmp/ssl-client.xml"); + } + + @Test + public void testParseMaps() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getMaxMaps(), DistCpConstants.DEFAULT_MAPS); + + options = OptionsParser.parse(new String[] { + "-m", + "1", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getMaxMaps(), 1); + + options = OptionsParser.parse(new String[] { + "-m", + "0", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getMaxMaps(), 1); + + try { + OptionsParser.parse(new String[] { + "-m", + "hello", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.fail("Non numberic map parsed"); + } catch (IllegalArgumentException ignore) { } + + try { + OptionsParser.parse(new String[] { + "-mapredXslConf", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.fail("Non numberic map parsed"); + } catch (IllegalArgumentException ignore) { } + } + + @Test + public void testSourceListing() { + DistCpOptions options = OptionsParser.parse(new String[] { + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getSourceFileListing(), + new Path("hdfs://localhost:8020/source/first")); + } + + @Test + public void testSourceListingAndSourcePath() { + try { + OptionsParser.parse(new String[] { + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.fail("Both source listing & source paths allowed"); + } catch (IllegalArgumentException ignore) {} + } + + @Test + public void testMissingSourceInfo() { + try { + OptionsParser.parse(new String[] { + "hdfs://localhost:8020/target/"}); + Assert.fail("Neither source listing not source paths present"); + } catch (IllegalArgumentException ignore) {} + } + + @Test + public void testMissingTarget() { + try { + OptionsParser.parse(new String[] { + "-f", "hdfs://localhost:8020/source"}); + Assert.fail("Missing target allowed"); + } catch (IllegalArgumentException ignore) {} + } + + @Test + public void testInvalidArgs() { + try { + OptionsParser.parse(new String[] { + "-m", "-f", "hdfs://localhost:8020/source"}); + Assert.fail("Missing map value"); + } catch (IllegalArgumentException ignore) {} + } + + @Test + public void testToString() { + DistCpOptions option = new DistCpOptions(new Path("abc"), new Path("xyz")); + String val = "DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, " + + "ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', " + + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz}"; + Assert.assertEquals(val, option.toString()); + Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), + DistCpOptionSwitch.ATOMIC_COMMIT.name()); + } + + @Test + public void testCopyStrategy() { + DistCpOptions options = OptionsParser.parse(new String[] { + "-strategy", + "dynamic", + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getCopyStrategy(), "dynamic"); + + options = OptionsParser.parse(new String[] { + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getCopyStrategy(), DistCpConstants.UNIFORMSIZE); + } + + @Test + public void testTargetPath() { + DistCpOptions options = OptionsParser.parse(new String[] { + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getTargetPath(), new Path("hdfs://localhost:8020/target/")); + } + + @Test + public void testPreserve() { + DistCpOptions options = OptionsParser.parse(new String[] { + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertFalse(options.shouldPreserve(FileAttribute.BLOCKSIZE)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.REPLICATION)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.USER)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP)); + + options = OptionsParser.parse(new String[] { + "-p", + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.USER)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); + + options = OptionsParser.parse(new String[] { + "-p", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.USER)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); + + options = OptionsParser.parse(new String[] { + "-pbr", + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.USER)); + Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP)); + + options = OptionsParser.parse(new String[] { + "-pbrgup", + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.USER)); + Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP)); + + options = OptionsParser.parse(new String[] { + "-p", + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + int i = 0; + Iterator<FileAttribute> attribIterator = options.preserveAttributes(); + while (attribIterator.hasNext()) { + attribIterator.next(); + i++; + } + Assert.assertEquals(i, 5); + + try { + OptionsParser.parse(new String[] { + "-pabc", + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target"}); + Assert.fail("Invalid preserve attribute"); + } + catch (IllegalArgumentException ignore) {} + catch (NoSuchElementException ignore) {} + + options = OptionsParser.parse(new String[] { + "-f", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION)); + options.preserve(FileAttribute.PERMISSION); + Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); + + options.preserve(FileAttribute.PERMISSION); + Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION)); + } + + @Test + public void testOptionsSwitchAddToConf() { + Configuration conf = new Configuration(); + Assert.assertNull(conf.get(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel())); + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT); + Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false)); + } + + @Test + public void testOptionsAppendToConf() { + Configuration conf = new Configuration(); + Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false)); + Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false)); + DistCpOptions options = OptionsParser.parse(new String[] { + "-atomic", + "-i", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + options.appendToConf(conf); + Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false)); + Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false)); + Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), + DistCpConstants.DEFAULT_BANDWIDTH_MB); + + conf = new Configuration(); + Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false)); + Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false)); + Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), null); + options = OptionsParser.parse(new String[] { + "-update", + "-delete", + "-pu", + "-bandwidth", + "11", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + options.appendToConf(conf); + Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false)); + Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false)); + Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U"); + Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11); + } +} Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyCommitter.java?rev=1495297&view=auto ============================================================================== --- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyCommitter.java (added) +++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/tools/distcp2/mapred/TestCopyCommitter.java Fri Jun 21 06:37:27 2013 @@ -0,0 +1,433 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.distcp2.mapred; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.tools.distcp2.CopyListing; +import org.apache.hadoop.tools.distcp2.DistCpConstants; +import org.apache.hadoop.tools.distcp2.DistCpOptions; +import org.apache.hadoop.tools.distcp2.DistCpOptions.FileAttribute; +import org.apache.hadoop.tools.distcp2.GlobbedCopyListing; +import org.apache.hadoop.tools.distcp2.util.TestDistCpUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestCopyCommitter { + private static final Log LOG = LogFactory.getLog(TestCopyCommitter.class); + + private static final Random rand = new Random(); + + private static final Credentials CREDENTIALS = new Credentials(); + public static final int PORT = 39737; + + + private static Configuration config; + private static MiniDFSCluster cluster; + + private static Job getJobForClient() throws IOException { + Job job = Job.getInstance(new Configuration()); + job.getConfiguration().set("mapred.job.tracker", "localhost:" + PORT); + job.setInputFormatClass(NullInputFormat.class); + job.setOutputFormatClass(NullOutputFormat.class); + job.setNumReduceTasks(0); + return job; + } + + @BeforeClass + public static void create() throws IOException { + config = getJobForClient().getConfiguration(); + config.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, 0); + cluster = new MiniDFSCluster(config, 1, true, null); + } + + @AfterClass + public static void destroy() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Before + public void createMetaFolder() { + config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta"); + Path meta = new Path("/meta"); + try { + cluster.getFileSystem().mkdirs(meta); + } catch (IOException e) { + LOG.error("Exception encountered while creating meta folder", e); + Assert.fail("Unable to create meta folder"); + } + } + + @After + public void cleanupMetaFolder() { + Path meta = new Path("/meta"); + try { + if (cluster.getFileSystem().exists(meta)) { + cluster.getFileSystem().delete(meta, true); + Assert.fail("Expected meta folder to be deleted"); + } + } catch (IOException e) { + LOG.error("Exception encountered while cleaning up folder", e); + Assert.fail("Unable to clean up meta folder"); + } + } + + @Test + public void testNoCommitAction() { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobContext jobContext = new JobContext(taskAttemptContext.getConfiguration(), + taskAttemptContext.getTaskAttemptID().getJobID()); + try { + OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); + committer.commitJob(jobContext); + Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful"); + + //Test for idempotent commit + committer.commitJob(jobContext); + Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful"); + } catch (IOException e) { + LOG.error("Exception encountered ", e); + Assert.fail("Commit failed"); + } + } + + @Test + public void testPreserveStatus() { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobContext jobContext = new JobContext(taskAttemptContext.getConfiguration(), + taskAttemptContext.getTaskAttemptID().getJobID()); + Configuration conf = jobContext.getConfiguration(); + + + String sourceBase; + String targetBase; + FileSystem fs = null; + try { + OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); + fs = FileSystem.get(conf); + FsPermission sourcePerm = new FsPermission((short) 511); + FsPermission initialPerm = new FsPermission((short) 448); + sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm); + targetBase = TestDistCpUtils.createTestSetup(fs, initialPerm); + + DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)), + new Path("/out")); + options.preserve(FileAttribute.PERMISSION); + options.appendToConf(conf); + + CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); + Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); + listing.buildListing(listingFile, options); + + conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); + + committer.commitJob(jobContext); + if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) { + Assert.fail("Permission don't match"); + } + + //Test for idempotent commit + committer.commitJob(jobContext); + if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) { + Assert.fail("Permission don't match"); + } + + } catch (IOException e) { + LOG.error("Exception encountered while testing for preserve status", e); + Assert.fail("Preserve status failure"); + } finally { + TestDistCpUtils.delete(fs, "/tmp1"); + } + + } + + @Test + public void testDeleteMissing() { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobContext jobContext = new JobContext(taskAttemptContext.getConfiguration(), + taskAttemptContext.getTaskAttemptID().getJobID()); + Configuration conf = jobContext.getConfiguration(); + + String sourceBase; + String targetBase; + FileSystem fs = null; + try { + OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); + fs = FileSystem.get(conf); + sourceBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault()); + targetBase = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault()); + String targetBaseAdd = TestDistCpUtils.createTestSetup(fs, FsPermission.getDefault()); + fs.rename(new Path(targetBaseAdd), new Path(targetBase)); + + DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)), + new Path("/out")); + options.setSyncFolder(true); + options.setDeleteMissing(true); + options.appendToConf(conf); + + CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); + Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); + listing.buildListing(listingFile, options); + + conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + + committer.commitJob(jobContext); + if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { + Assert.fail("Source and target folders are not in sync"); + } + if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) { + Assert.fail("Source and target folders are not in sync"); + } + + //Test for idempotent commit + committer.commitJob(jobContext); + if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { + Assert.fail("Source and target folders are not in sync"); + } + if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) { + Assert.fail("Source and target folders are not in sync"); + } + } catch (Throwable e) { + LOG.error("Exception encountered while testing for delete missing", e); + Assert.fail("Delete missing failure"); + } finally { + TestDistCpUtils.delete(fs, "/tmp1"); + conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false"); + } + } + + @Test + public void testDeleteMissingFlatInterleavedFiles() { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobContext jobContext = new JobContext(taskAttemptContext.getConfiguration(), + taskAttemptContext.getTaskAttemptID().getJobID()); + Configuration conf = jobContext.getConfiguration(); + + + String sourceBase; + String targetBase; + FileSystem fs = null; + try { + OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); + fs = FileSystem.get(conf); + sourceBase = "/tmp1/" + String.valueOf(rand.nextLong()); + targetBase = "/tmp1/" + String.valueOf(rand.nextLong()); + TestDistCpUtils.createFile(fs, sourceBase + "/1"); + TestDistCpUtils.createFile(fs, sourceBase + "/3"); + TestDistCpUtils.createFile(fs, sourceBase + "/4"); + TestDistCpUtils.createFile(fs, sourceBase + "/5"); + TestDistCpUtils.createFile(fs, sourceBase + "/7"); + TestDistCpUtils.createFile(fs, sourceBase + "/8"); + TestDistCpUtils.createFile(fs, sourceBase + "/9"); + + TestDistCpUtils.createFile(fs, targetBase + "/2"); + TestDistCpUtils.createFile(fs, targetBase + "/4"); + TestDistCpUtils.createFile(fs, targetBase + "/5"); + TestDistCpUtils.createFile(fs, targetBase + "/7"); + TestDistCpUtils.createFile(fs, targetBase + "/9"); + TestDistCpUtils.createFile(fs, targetBase + "/A"); + + DistCpOptions options = new DistCpOptions(Arrays.asList(new Path(sourceBase)), + new Path("/out")); + options.setSyncFolder(true); + options.setDeleteMissing(true); + options.appendToConf(conf); + + CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); + Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); + listing.buildListing(listingFile, options); + + conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + + committer.commitJob(jobContext); + if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { + Assert.fail("Source and target folders are not in sync"); + } + Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4); + + //Test for idempotent commit + committer.commitJob(jobContext); + if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { + Assert.fail("Source and target folders are not in sync"); + } + Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4); + } catch (IOException e) { + LOG.error("Exception encountered while testing for delete missing", e); + Assert.fail("Delete missing failure"); + } finally { + TestDistCpUtils.delete(fs, "/tmp1"); + conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false"); + } + + } + + @Test + public void testAtomicCommitMissingFinal() { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobContext jobContext = new JobContext(taskAttemptContext.getConfiguration(), + taskAttemptContext.getTaskAttemptID().getJobID()); + Configuration conf = jobContext.getConfiguration(); + + String workPath = "/tmp1/" + String.valueOf(rand.nextLong()); + String finalPath = "/tmp1/" + String.valueOf(rand.nextLong()); + FileSystem fs = null; + try { + OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); + fs = FileSystem.get(conf); + fs.mkdirs(new Path(workPath)); + + conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath); + conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath); + conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); + + Assert.assertTrue(fs.exists(new Path(workPath))); + Assert.assertFalse(fs.exists(new Path(finalPath))); + committer.commitJob(jobContext); + Assert.assertFalse(fs.exists(new Path(workPath))); + Assert.assertTrue(fs.exists(new Path(finalPath))); + + //Test for idempotent commit + committer.commitJob(jobContext); + Assert.assertFalse(fs.exists(new Path(workPath))); + Assert.assertTrue(fs.exists(new Path(finalPath))); + + } catch (IOException e) { + LOG.error("Exception encountered while testing for preserve status", e); + Assert.fail("Atomic commit failure"); + } finally { + TestDistCpUtils.delete(fs, workPath); + TestDistCpUtils.delete(fs, finalPath); + } + } + + @Test + public void testAtomicCommitExistingFinal() { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobContext jobContext = new JobContext(taskAttemptContext.getConfiguration(), + taskAttemptContext.getTaskAttemptID().getJobID()); + Configuration conf = jobContext.getConfiguration(); + + + String workPath = "/tmp1/" + String.valueOf(rand.nextLong()); + String finalPath = "/tmp1/" + String.valueOf(rand.nextLong()); + FileSystem fs = null; + try { + OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); + fs = FileSystem.get(conf); + fs.mkdirs(new Path(workPath)); + fs.mkdirs(new Path(finalPath)); + + conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath); + conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath); + conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); + + Assert.assertTrue(fs.exists(new Path(workPath))); + Assert.assertTrue(fs.exists(new Path(finalPath))); + try { + committer.commitJob(jobContext); + Assert.fail("Should not be able to atomic-commit to pre-existing path."); + } catch(Exception exception) { + Assert.assertTrue(fs.exists(new Path(workPath))); + Assert.assertTrue(fs.exists(new Path(finalPath))); + LOG.info("Atomic-commit Test pass."); + } + + } catch (IOException e) { + LOG.error("Exception encountered while testing for atomic commit.", e); + Assert.fail("Atomic commit failure"); + } finally { + TestDistCpUtils.delete(fs, workPath); + TestDistCpUtils.delete(fs, finalPath); + } + } + + private TaskAttemptContext getTaskAttemptContext(Configuration conf) { + return new TaskAttemptContext(conf, + new TaskAttemptID("200707121733", 1, true, 1, 1)); + } + + private boolean checkDirectoryPermissions(FileSystem fs, String targetBase, + FsPermission sourcePerm) throws IOException { + Path base = new Path(targetBase); + + Stack<Path> stack = new Stack<Path>(); + stack.push(base); + while (!stack.isEmpty()) { + Path file = stack.pop(); + if (!fs.exists(file)) continue; + FileStatus[] fStatus = fs.listStatus(file); + if (fStatus == null || fStatus.length == 0) continue; + + for (FileStatus status : fStatus) { + if (status.isDir()) { + stack.push(status.getPath()); + Assert.assertEquals(status.getPermission(), sourcePerm); + } + } + } + return true; + } + + private static class NullInputFormat extends InputFormat { + @Override + public List getSplits(JobContext context) + throws IOException, InterruptedException { + return Collections.EMPTY_LIST; + } + + @Override + public RecordReader createRecordReader(InputSplit split, + TaskAttemptContext context) + throws IOException, InterruptedException { + return null; + } + } +}