Author: amareshwari
Date: Fri Mar 22 10:09:06 2013
New Revision: 1459690
URL: http://svn.apache.org/r1459690
Log:
MAPREDUCE-5014. Extend Distcp to accept a custom CopyListing. Contributed by
Srikanth Sundarrajan
Modified:
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
Modified:
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java?rev=1459690&r1=1459689&r2=1459690&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
(original)
+++
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
Fri Mar 22 10:09:06 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.tools.util.Dist
import org.apache.hadoop.security.Credentials;
import java.io.IOException;
+import java.lang.reflect.Constructor;
/**
* The CopyListing abstraction is responsible for how the list of
@@ -193,14 +194,34 @@ public abstract class CopyListing extend
* @param credentials Credentials object on which the FS delegation tokens
are cached
* @param options The input Options, to help choose the appropriate
CopyListing Implementation.
* @return An instance of the appropriate CopyListing implementation.
+ * @throws java.io.IOException - Exception if any
*/
public static CopyListing getCopyListing(Configuration configuration,
Credentials credentials,
- DistCpOptions options) {
- if (options.getSourceFileListing() == null) {
- return new GlobbedCopyListing(configuration, credentials);
- } else {
- return new FileBasedCopyListing(configuration, credentials);
+ DistCpOptions options)
+ throws IOException {
+
+ String copyListingClassName = configuration.get(DistCpConstants.
+ CONF_LABEL_COPY_LISTING_CLASS, "");
+ Class<? extends CopyListing> copyListingClass;
+ try {
+ if (! copyListingClassName.isEmpty()) {
+ copyListingClass = configuration.getClass(DistCpConstants.
+ CONF_LABEL_COPY_LISTING_CLASS, GlobbedCopyListing.class,
+ CopyListing.class);
+ } else {
+ if (options.getSourceFileListing() == null) {
+ copyListingClass = GlobbedCopyListing.class;
+ } else {
+ copyListingClass = FileBasedCopyListing.class;
+ }
+ }
+ copyListingClassName = copyListingClass.getName();
+ Constructor<? extends CopyListing> constructor = copyListingClass.
+ getDeclaredConstructor(Configuration.class, Credentials.class);
+ return constructor.newInstance(configuration, credentials);
+ } catch (Exception e) {
+ throw new IOException("Unable to instantiate " + copyListingClassName,
e);
}
}
Modified:
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java?rev=1459690&r1=1459689&r2=1459690&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
(original)
+++
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCp.java
Fri Mar 22 10:09:06 2013
@@ -319,7 +319,7 @@ public class DistCp extends Configured i
* @return Returns the path where the copy listing is created
* @throws IOException - If any
*/
- private Path createInputFileListing(Job job) throws IOException {
+ protected Path createInputFileListing(Job job) throws IOException {
Path fileListingPath = getFileListingPath();
CopyListing copyListing =
CopyListing.getCopyListing(job.getConfiguration(),
job.getCredentials(), inputOptions);
@@ -334,7 +334,7 @@ public class DistCp extends Configured i
* @return - Path where the copy listing file has to be saved
* @throws IOException - Exception if any
*/
- private Path getFileListingPath() throws IOException {
+ protected Path getFileListingPath() throws IOException {
String fileListPathStr = metaFolder + "/fileList.seq";
Path path = new Path(fileListPathStr);
return new Path(path.toUri().normalize().toString());
Modified:
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java?rev=1459690&r1=1459689&r2=1459690&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
(original)
+++
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
Fri Mar 22 10:09:06 2013
@@ -82,6 +82,9 @@ public class DistCpConstants {
/* Meta folder where the job's intermediate data is kept */
public static final String CONF_LABEL_META_FOLDER = "distcp.meta.folder";
+ /* DistCp CopyListing class override param */
+ public static final String CONF_LABEL_COPY_LISTING_CLASS =
"distcp.copy.listing.class";
+
/**
* Conf label for SSL Trust-store location.
*/
Modified:
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java?rev=1459690&r1=1459689&r2=1459690&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
(original)
+++
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
Fri Mar 22 10:09:06 2013
@@ -127,17 +127,20 @@ public class SimpleCopyListing extends C
if (LOG.isDebugEnabled()) {
LOG.debug("Recording source-path: " + sourceStatus.getPath() + "
for copy.");
}
- writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot,
localFile);
+ writeToFileListing(fileListWriter, sourceStatus, sourcePathRoot,
+ localFile, options);
if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Traversing non-empty source dir: " +
sourceStatus.getPath());
}
- traverseNonEmptyDirectory(fileListWriter, sourceStatus,
sourcePathRoot, localFile);
+ traverseNonEmptyDirectory(fileListWriter, sourceStatus,
sourcePathRoot,
+ localFile, options);
}
}
} else {
- writeToFileListing(fileListWriter, rootStatus, sourcePathRoot,
localFile);
+ writeToFileListing(fileListWriter, rootStatus, sourcePathRoot,
+ localFile, options);
}
}
} finally {
@@ -169,6 +172,17 @@ public class SimpleCopyListing extends C
}
}
+ /**
+ * Provide an option to skip copy of a path, Allows for exclusion
+ * of files such as {@link
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME}
+ * @param path - Path being considered for copy while building the file
listing
+ * @param options - Input options passed during DistCp invocation
+ * @return - True if the path should be considered for copy, false otherwise
+ */
+ protected boolean shouldCopy(Path path, DistCpOptions options) {
+ return true;
+ }
+
/** {@inheritDoc} */
@Override
protected long getBytesToCopy() {
@@ -210,7 +224,9 @@ public class SimpleCopyListing extends C
private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter,
FileStatus sourceStatus,
- Path sourcePathRoot, boolean
localFile)
+ Path sourcePathRoot,
+ boolean localFile,
+ DistCpOptions options)
throws IOException {
FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
Stack<FileStatus> pathStack = new Stack<FileStatus>();
@@ -221,7 +237,8 @@ public class SimpleCopyListing extends C
if (LOG.isDebugEnabled())
LOG.debug("Recording source-path: "
+ sourceStatus.getPath() + " for copy.");
- writeToFileListing(fileListWriter, child, sourcePathRoot, localFile);
+ writeToFileListing(fileListWriter, child, sourcePathRoot,
+ localFile, options);
if (isDirectoryAndNotEmpty(sourceFS, child)) {
if (LOG.isDebugEnabled())
LOG.debug("Traversing non-empty source dir: "
@@ -233,8 +250,10 @@ public class SimpleCopyListing extends C
}
private void writeToFileListing(SequenceFile.Writer fileListWriter,
- FileStatus fileStatus, Path sourcePathRoot,
- boolean localFile) throws IOException {
+ FileStatus fileStatus,
+ Path sourcePathRoot,
+ boolean localFile,
+ DistCpOptions options) throws IOException {
if (fileStatus.getPath().equals(sourcePathRoot) &&
fileStatus.isDirectory())
return; // Skip the root-paths.
@@ -248,6 +267,10 @@ public class SimpleCopyListing extends C
status = getFileStatus(fileStatus);
}
+ if (!shouldCopy(fileStatus.getPath(), options)) {
+ return;
+ }
+
fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
fileStatus.getPath())), status);
fileListWriter.sync();
Modified:
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java?rev=1459690&r1=1459689&r2=1459690&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
(original)
+++
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java
Fri Mar 22 10:09:06 2013
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.Credentials;
@@ -79,7 +80,39 @@ public class TestCopyListing extends Sim
return 0;
}
- @Test
+ @Test(timeout=10000)
+ public void testSkipCopy() throws Exception {
+ SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS) {
+ @Override
+ protected boolean shouldCopy(Path path, DistCpOptions options) {
+ return !path.getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME);
+ }
+ };
+ FileSystem fs = FileSystem.get(getConf());
+ List<Path> srcPaths = new ArrayList<Path>();
+ srcPaths.add(new Path("/tmp/in4/1"));
+ srcPaths.add(new Path("/tmp/in4/2"));
+ Path target = new Path("/tmp/out4/1");
+ TestDistCpUtils.createFile(fs, "/tmp/in4/1/_SUCCESS");
+ TestDistCpUtils.createFile(fs, "/tmp/in4/1/file");
+ TestDistCpUtils.createFile(fs, "/tmp/in4/2");
+ fs.mkdirs(target);
+ DistCpOptions options = new DistCpOptions(srcPaths, target);
+ Path listingFile = new Path("/tmp/list4");
+ listing.buildListing(listingFile, options);
+ Assert.assertEquals(listing.getNumberOfPaths(), 2);
+ SequenceFile.Reader reader = new SequenceFile.Reader(getConf(),
+ SequenceFile.Reader.file(listingFile));
+ FileStatus fileStatus = new FileStatus();
+ Text relativePath = new Text();
+ Assert.assertTrue(reader.next(relativePath, fileStatus));
+ Assert.assertEquals(relativePath.toString(), "/1/file");
+ Assert.assertTrue(reader.next(relativePath, fileStatus));
+ Assert.assertEquals(relativePath.toString(), "/2");
+ Assert.assertFalse(reader.next(relativePath, fileStatus));
+ }
+
+ @Test(timeout=10000)
public void testMultipleSrcToFile() {
FileSystem fs = null;
try {
@@ -124,7 +157,7 @@ public class TestCopyListing extends Sim
}
}
- @Test
+ @Test(timeout=10000)
public void testDuplicates() {
FileSystem fs = null;
try {
@@ -150,7 +183,7 @@ public class TestCopyListing extends Sim
}
}
- @Test
+ @Test(timeout=10000)
public void testBuildListing() {
FileSystem fs = null;
try {
@@ -206,7 +239,7 @@ public class TestCopyListing extends Sim
}
}
- @Test
+ @Test(timeout=10000)
public void testBuildListingForSingleFile() {
FileSystem fs = null;
String testRootString = "/singleFileListing";
Modified:
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java?rev=1459690&r1=1459689&r2=1459690&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
(original)
+++
hadoop/common/trunk/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java
Fri Mar 22 10:09:06 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -34,6 +35,7 @@ import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
public class TestIntegration {
@@ -68,7 +70,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testSingleFileMissingTarget() {
caseSingleFileMissingTarget(false);
caseSingleFileMissingTarget(true);
@@ -91,7 +93,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testSingleFileTargetFile() {
caseSingleFileTargetFile(false);
caseSingleFileTargetFile(true);
@@ -114,7 +116,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testSingleFileTargetDir() {
caseSingleFileTargetDir(false);
caseSingleFileTargetDir(true);
@@ -138,7 +140,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testSingleDirTargetMissing() {
caseSingleDirTargetMissing(false);
caseSingleDirTargetMissing(true);
@@ -161,7 +163,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testSingleDirTargetPresent() {
try {
@@ -180,7 +182,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testUpdateSingleDirTargetPresent() {
try {
@@ -199,7 +201,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testMultiFileTargetPresent() {
caseMultiFileTargetPresent(false);
caseMultiFileTargetPresent(true);
@@ -223,7 +225,56 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
+ public void testCustomCopyListing() {
+
+ try {
+ addEntries(listFile, "multifile1/file3", "multifile1/file4",
"multifile1/file5");
+ createFiles("multifile1/file3", "multifile1/file4", "multifile1/file5");
+ mkdirs(target.toString());
+
+ Configuration conf = getConf();
+ try {
+ conf.setClass(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS,
+ CustomCopyListing.class, CopyListing.class);
+ DistCpOptions options = new DistCpOptions(Arrays.
+ asList(new Path(root + "/" + "multifile1")), target);
+ options.setSyncFolder(true);
+ options.setDeleteMissing(false);
+ options.setOverwrite(false);
+ try {
+ new DistCp(conf, options).execute();
+ } catch (Exception e) {
+ LOG.error("Exception encountered ", e);
+ throw new IOException(e);
+ }
+ } finally {
+ conf.unset(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS);
+ }
+
+ checkResult(target, 2, "file4", "file5");
+ } catch (IOException e) {
+ LOG.error("Exception encountered while testing distcp", e);
+ Assert.fail("distcp failure");
+ } finally {
+ TestDistCpUtils.delete(fs, root);
+ }
+ }
+
+ private static class CustomCopyListing extends SimpleCopyListing {
+
+ public CustomCopyListing(Configuration configuration,
+ Credentials credentials) {
+ super(configuration, credentials);
+ }
+
+ @Override
+ protected boolean shouldCopy(Path path, DistCpOptions options) {
+ return !path.getName().equals("file3");
+ }
+ }
+
+ @Test(timeout=100000)
public void testMultiFileTargetMissing() {
caseMultiFileTargetMissing(false);
caseMultiFileTargetMissing(true);
@@ -246,7 +297,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testMultiDirTargetPresent() {
try {
@@ -265,7 +316,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testUpdateMultiDirTargetPresent() {
try {
@@ -284,7 +335,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testMultiDirTargetMissing() {
try {
@@ -304,7 +355,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testUpdateMultiDirTargetMissing() {
try {
@@ -323,7 +374,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testDeleteMissingInDestination() {
try {
@@ -343,7 +394,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testOverwrite() {
byte[] contents1 = "contents1".getBytes();
byte[] contents2 = "contents2".getBytes();
@@ -375,7 +426,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testGlobTargetMissingSingleLevel() {
try {
@@ -398,7 +449,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testUpdateGlobTargetMissingSingleLevel() {
try {
@@ -420,7 +471,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testGlobTargetMissingMultiLevel() {
try {
@@ -444,7 +495,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testUpdateGlobTargetMissingMultiLevel() {
try {
@@ -468,7 +519,7 @@ public class TestIntegration {
}
}
- @Test
+ @Test(timeout=100000)
public void testCleanup() {
try {
Path sourcePath = new Path("noscheme:///file");