Author: cnauroth
Date: Fri Jan 31 23:05:23 2014
New Revision: 1563327
URL: http://svn.apache.org/r1563327
Log:
Merge trunk to HDFS-4685.
Added:
hadoop/common/branches/HDFS-4685/dev-support/create-release.sh
- copied unchanged from r1563325,
hadoop/common/trunk/dev-support/create-release.sh
Modified:
hadoop/common/branches/HDFS-4685/ (props changed)
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-sls/src/main/data/2jobs2min-rumen-jh.json
Propchange: hadoop/common/branches/HDFS-4685/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk:r1562669-1563325
Modified:
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java?rev=1563327&r1=1563326&r2=1563327&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
(original)
+++
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
Fri Jan 31 23:05:23 2014
@@ -37,15 +37,16 @@ public enum DistCpOptionSwitch {
/**
* Preserves status of file/path in the target.
* Default behavior with -p, is to preserve replication,
- * block size, user, group and permission on the target file
+ * block size, user, group, permission and checksum type on the target file.
+ * Note that when preserving checksum type, block size is also preserved.
*
- * If any of the optional switches are present among rbugp, then
- * only the corresponding file attribute is preserved
+ * If any of the optional switches are present among rbugpc, then
+ * only the corresponding file attribute is preserved.
*
*/
PRESERVE_STATUS(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
- new Option("p", true, "preserve status (rbugp)" +
- "(replication, block-size, user, group, permission)")),
+ new Option("p", true, "preserve status (rbugpc)" +
+ "(replication, block-size, user, group, permission,
checksum-type)")),
/**
* Update target location by copying only files that are missing
@@ -53,7 +54,7 @@ public enum DistCpOptionSwitch {
* across source and target. Typically used with DELETE_MISSING
* Incompatible with ATOMIC_COMMIT
*/
- SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS,
+ SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS,
new Option("update", false, "Update target, copying only missing" +
"files or directories")),
@@ -80,7 +81,7 @@ public enum DistCpOptionSwitch {
* Max number of maps to use during copy. DistCp will split work
* as equally as possible among these maps
*/
- MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS,
+ MAX_MAPS(DistCpConstants.CONF_LABEL_MAX_MAPS,
new Option("m", true, "Max number of concurrent maps to use for copy")),
/**
Modified:
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java?rev=1563327&r1=1563326&r2=1563327&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
(original)
+++
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
Fri Jan 31 23:05:23 2014
@@ -61,7 +61,7 @@ public class DistCpOptions {
private Path targetPath;
public static enum FileAttribute{
- REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION;
+ REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION, CHECKSUMTYPE;
public static FileAttribute getAttribute(char symbol) {
for (FileAttribute attribute : values()) {
Modified:
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java?rev=1563327&r1=1563326&r2=1563327&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
(original)
+++
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
Fri Jan 31 23:05:23 2014
@@ -34,7 +34,7 @@ public class OptionsParser {
private static final Log LOG = LogFactory.getLog(OptionsParser.class);
- private static final Options cliOptions = new Options();
+ private static final Options cliOptions = new Options();
static {
for (DistCpOptionSwitch option : DistCpOptionSwitch.values()) {
@@ -50,7 +50,7 @@ public class OptionsParser {
protected String[] flatten(Options options, String[] arguments, boolean
stopAtNonOption) {
for (int index = 0; index < arguments.length; index++) {
if (arguments[index].equals("-" +
DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
- arguments[index] = "-prbugp";
+ arguments[index] = "-prbugpc";
}
}
return super.flatten(options, arguments, stopAtNonOption);
@@ -125,7 +125,7 @@ public class OptionsParser {
option.setAtomicWorkPath(new Path(workPath));
}
} else if (command.hasOption(DistCpOptionSwitch.WORK_PATH.getSwitch())) {
- throw new IllegalArgumentException("-tmp work-path can only be specified
along with -atomic");
+ throw new IllegalArgumentException("-tmp work-path can only be specified
along with -atomic");
}
if (command.hasOption(DistCpOptionSwitch.LOG_PATH.getSwitch())) {
Modified:
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java?rev=1563327&r1=1563326&r2=1563327&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
(original)
+++
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java
Fri Jan 31 23:05:23 2014
@@ -111,7 +111,7 @@ public class CopyMapper extends Mapper<T
*/
private void initializeSSLConf(Context context) throws IOException {
LOG.info("Initializing SSL configuration");
-
+
String workDir = conf.get(JobContext.JOB_LOCAL_DIR) + "/work";
Path[] cacheFiles = context.getLocalCacheFiles();
@@ -294,7 +294,7 @@ public class CopyMapper extends Mapper<T
RetriableFileCopyCommand.CopyReadException) {
incrementCounter(context, Counter.FAIL, 1);
incrementCounter(context, Counter.BYTESFAILED,
sourceFileStatus.getLen());
- context.write(null, new Text("FAIL: " + sourceFileStatus.getPath() + " -
" +
+ context.write(null, new Text("FAIL: " + sourceFileStatus.getPath() + " -
" +
StringUtils.stringifyException(exception)));
}
else
@@ -322,7 +322,7 @@ public class CopyMapper extends Mapper<T
targetFileStatus.getLen() != source.getLen()
|| (!skipCrc &&
!DistCpUtils.checksumsAreEqual(sourceFS,
- source.getPath(), targetFS, target))
+ source.getPath(), null, targetFS, target))
|| (source.getBlockSize() != targetFileStatus.getBlockSize() &&
preserve.contains(FileAttribute.BLOCKSIZE))
);
Modified:
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java?rev=1563327&r1=1563326&r2=1563327&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
(original)
+++
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java
Fri Jan 31 23:05:23 2014
@@ -18,23 +18,33 @@
package org.apache.hadoop.tools.mapred;
-import org.apache.hadoop.tools.util.RetriableCommand;
-import org.apache.hadoop.tools.util.ThrottledInputStream;
-import org.apache.hadoop.tools.util.DistCpUtils;
-import org.apache.hadoop.tools.DistCpOptions.*;
-import org.apache.hadoop.tools.DistCpConstants;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IOUtils;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.EnumSet;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
+import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.tools.util.RetriableCommand;
+import org.apache.hadoop.tools.util.ThrottledInputStream;
import com.google.common.annotations.VisibleForTesting;
-import java.io.*;
-import java.util.EnumSet;
-
/**
* This class extends RetriableCommand to implement the copy of files,
* with retries on failure.
@@ -44,7 +54,7 @@ public class RetriableFileCopyCommand ex
private static Log LOG = LogFactory.getLog(RetriableFileCopyCommand.class);
private static int BUFFER_SIZE = 8 * 1024;
private boolean skipCrc = false;
-
+
/**
* Constructor, taking a description of the action.
* @param description Verbose description of the copy operation.
@@ -52,7 +62,7 @@ public class RetriableFileCopyCommand ex
public RetriableFileCopyCommand(String description) {
super(description);
}
-
+
/**
* Create a RetriableFileCopyCommand.
*
@@ -99,15 +109,21 @@ public class RetriableFileCopyCommand ex
LOG.debug("Copying " + sourceFileStatus.getPath() + " to " + target);
LOG.debug("Tmp-file path: " + tmpTargetPath);
}
- FileSystem sourceFS = sourceFileStatus.getPath().getFileSystem(
- configuration);
+ final Path sourcePath = sourceFileStatus.getPath();
+ final FileSystem sourceFS = sourcePath.getFileSystem(configuration);
+ final FileChecksum sourceChecksum = fileAttributes
+ .contains(FileAttribute.CHECKSUMTYPE) ? sourceFS
+ .getFileChecksum(sourcePath) : null;
+
long bytesRead = copyToTmpFile(tmpTargetPath, targetFS, sourceFileStatus,
- context, fileAttributes);
+ context, fileAttributes, sourceChecksum);
- compareFileLengths(sourceFileStatus, tmpTargetPath, configuration,
bytesRead);
+ compareFileLengths(sourceFileStatus, tmpTargetPath, configuration,
+ bytesRead);
//At this point, src&dest lengths are same. if length==0, we skip
checksum
if ((bytesRead != 0) && (!skipCrc)) {
- compareCheckSums(sourceFS, sourceFileStatus.getPath(), targetFS,
tmpTargetPath);
+ compareCheckSums(sourceFS, sourceFileStatus.getPath(), sourceChecksum,
+ targetFS, tmpTargetPath);
}
promoteTmpToTarget(tmpTargetPath, target, targetFS);
return bytesRead;
@@ -118,14 +134,33 @@ public class RetriableFileCopyCommand ex
}
}
+ /**
+ * @return the checksum spec of the source checksum if checksum type should
be
+ * preserved
+ */
+ private ChecksumOpt getChecksumOpt(EnumSet<FileAttribute> fileAttributes,
+ FileChecksum sourceChecksum) {
+ if (fileAttributes.contains(FileAttribute.CHECKSUMTYPE)
+ && sourceChecksum != null) {
+ return sourceChecksum.getChecksumOpt();
+ }
+ return null;
+ }
+
private long copyToTmpFile(Path tmpTargetPath, FileSystem targetFS,
- FileStatus sourceFileStatus, Mapper.Context
context,
- EnumSet<FileAttribute> fileAttributes)
- throws IOException {
- OutputStream outStream = new BufferedOutputStream(targetFS.create(
- tmpTargetPath, true, BUFFER_SIZE,
- getReplicationFactor(fileAttributes, sourceFileStatus, targetFS,
tmpTargetPath),
- getBlockSize(fileAttributes, sourceFileStatus, targetFS,
tmpTargetPath), context));
+ FileStatus sourceFileStatus, Mapper.Context context,
+ EnumSet<FileAttribute> fileAttributes, final FileChecksum sourceChecksum)
+ throws IOException {
+ FsPermission permission = FsPermission.getFileDefault().applyUMask(
+ FsPermission.getUMask(targetFS.getConf()));
+ OutputStream outStream = new BufferedOutputStream(
+ targetFS.create(tmpTargetPath, permission,
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), BUFFER_SIZE,
+ getReplicationFactor(fileAttributes, sourceFileStatus, targetFS,
+ tmpTargetPath),
+ getBlockSize(fileAttributes, sourceFileStatus, targetFS,
+ tmpTargetPath),
+ context, getChecksumOpt(fileAttributes, sourceChecksum)));
return copyBytes(sourceFileStatus, outStream, BUFFER_SIZE, context);
}
@@ -140,9 +175,10 @@ public class RetriableFileCopyCommand ex
}
private void compareCheckSums(FileSystem sourceFS, Path source,
- FileSystem targetFS, Path target)
- throws IOException {
- if (!DistCpUtils.checksumsAreEqual(sourceFS, source, targetFS, target)) {
+ FileChecksum sourceChecksum, FileSystem targetFS, Path target)
+ throws IOException {
+ if (!DistCpUtils.checksumsAreEqual(sourceFS, source, sourceChecksum,
+ targetFS, target)) {
StringBuilder errorMessage = new StringBuilder("Check-sum mismatch
between ")
.append(source).append(" and ").append(target).append(".");
if (sourceFS.getFileStatus(source).getBlockSize() !=
targetFS.getFileStatus(target).getBlockSize()) {
@@ -249,11 +285,18 @@ public class RetriableFileCopyCommand ex
sourceFile.getReplication() :
targetFS.getDefaultReplication(tmpTargetPath);
}
+ /**
+ * @return the block size of the source file if we need to preserve either
+ * the block size or the checksum type. Otherwise the default block
+ * size of the target FS.
+ */
private static long getBlockSize(
EnumSet<FileAttribute> fileAttributes,
FileStatus sourceFile, FileSystem targetFS, Path tmpTargetPath) {
- return fileAttributes.contains(FileAttribute.BLOCKSIZE)?
- sourceFile.getBlockSize() :
targetFS.getDefaultBlockSize(tmpTargetPath);
+ boolean preserve = fileAttributes.contains(FileAttribute.BLOCKSIZE)
+ || fileAttributes.contains(FileAttribute.CHECKSUMTYPE);
+ return preserve ? sourceFile.getBlockSize() : targetFS
+ .getDefaultBlockSize(tmpTargetPath);
}
/**
@@ -261,7 +304,7 @@ public class RetriableFileCopyCommand ex
* failures from other kinds of IOExceptions.
* The failure to read from source is dealt with specially, in the
CopyMapper.
* Such failures may be skipped if the DistCpOptions indicate so.
- * Write failures are intolerable, and amount to CopyMapper failure.
+ * Write failures are intolerable, and amount to CopyMapper failure.
*/
public static class CopyReadException extends IOException {
public CopyReadException(Throwable rootCause) {
Modified:
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java?rev=1563327&r1=1563326&r2=1563327&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
(original)
+++
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
Fri Jan 31 23:05:23 2014
@@ -125,7 +125,7 @@ public class DistCpUtils {
* @param sourceRootPath - Source root path
* @param childPath - Path for which relative path is required
* @return - Relative portion of the child path (always prefixed with /
- * unless it is empty
+ * unless it is empty
*/
public static String getRelativePath(Path sourceRootPath, Path childPath) {
String childPathString = childPath.toUri().getPath();
@@ -277,9 +277,11 @@ public class DistCpUtils {
* If checksums's can't be retrieved, it doesn't fail the test
* Only time the comparison would fail is when checksums are
* available and they don't match
- *
+ *
* @param sourceFS FileSystem for the source path.
* @param source The source path.
+ * @param sourceChecksum The checksum of the source file. If it is null we
+ * still need to retrieve it through sourceFS.
* @param targetFS FileSystem for the target path.
* @param target The target path.
* @return If either checksum couldn't be retrieved, the function returns
@@ -288,12 +290,12 @@ public class DistCpUtils {
* @throws IOException if there's an exception while retrieving checksums.
*/
public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
- FileSystem targetFS, Path target)
- throws IOException {
- FileChecksum sourceChecksum = null;
+ FileChecksum sourceChecksum, FileSystem targetFS, Path target)
+ throws IOException {
FileChecksum targetChecksum = null;
try {
- sourceChecksum = sourceFS.getFileChecksum(source);
+ sourceChecksum = sourceChecksum != null ? sourceChecksum : sourceFS
+ .getFileChecksum(source);
targetChecksum = targetFS.getFileChecksum(target);
} catch (IOException e) {
LOG.error("Unable to retrieve checksum for " + source + " or " + target,
e);
Modified:
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java?rev=1563327&r1=1563326&r2=1563327&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
(original)
+++
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java
Fri Jan 31 23:05:23 2014
@@ -110,7 +110,7 @@ public class TestOptionsParser {
"hdfs://localhost:8020/target/"});
Assert.assertEquals(options.getMapBandwidth(), 11);
}
-
+
@Test(expected=IllegalArgumentException.class)
public void testParseNonPositiveBandwidth() {
OptionsParser.parse(new String[] {
@@ -119,7 +119,7 @@ public class TestOptionsParser {
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
}
-
+
@Test(expected=IllegalArgumentException.class)
public void testParseZeroBandwidth() {
OptionsParser.parse(new String[] {
@@ -397,6 +397,7 @@ public class TestOptionsParser {
Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-p",
@@ -408,6 +409,7 @@ public class TestOptionsParser {
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-p",
@@ -418,6 +420,7 @@ public class TestOptionsParser {
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-pbr",
@@ -429,6 +432,7 @@ public class TestOptionsParser {
Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-pbrgup",
@@ -440,6 +444,31 @@ public class TestOptionsParser {
Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+
+ options = OptionsParser.parse(new String[] {
+ "-pbrgupc",
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.REPLICATION));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.PERMISSION));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.USER));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.GROUP));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
+
+ options = OptionsParser.parse(new String[] {
+ "-pc",
+ "-f",
+ "hdfs://localhost:8020/source/first",
+ "hdfs://localhost:8020/target/"});
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.BLOCKSIZE));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.REPLICATION));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.PERMISSION));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.USER));
+ Assert.assertFalse(options.shouldPreserve(FileAttribute.GROUP));
+ Assert.assertTrue(options.shouldPreserve(FileAttribute.CHECKSUMTYPE));
options = OptionsParser.parse(new String[] {
"-p",
@@ -452,7 +481,7 @@ public class TestOptionsParser {
attribIterator.next();
i++;
}
- Assert.assertEquals(i, 5);
+ Assert.assertEquals(i, 6);
try {
OptionsParser.parse(new String[] {
Modified:
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java?rev=1563327&r1=1563326&r2=1563327&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
(original)
+++
hadoop/common/branches/HDFS-4685/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java
Fri Jan 31 23:05:23 2014
@@ -18,18 +18,28 @@
package org.apache.hadoop.tools.mapred;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.DistCpConstants;
@@ -37,23 +47,17 @@ import org.apache.hadoop.tools.DistCpOpt
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.StubContext;
import org.apache.hadoop.tools.util.DistCpUtils;
+import org.apache.hadoop.util.DataChecksum;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-
public class TestCopyMapper {
private static final Log LOG = LogFactory.getLog(TestCopyMapper.class);
private static List<Path> pathList = new ArrayList<Path>();
private static int nFiles = 0;
private static final int DEFAULT_FILE_SIZE = 1024;
+ private static final long NON_DEFAULT_BLOCK_SIZE = 4096;
private static MiniDFSCluster cluster;
@@ -119,12 +123,27 @@ public class TestCopyMapper {
mkdirs(SOURCE_PATH + "/2/3/4");
mkdirs(SOURCE_PATH + "/2/3");
mkdirs(SOURCE_PATH + "/5");
- touchFile(SOURCE_PATH + "/5/6", true);
+ touchFile(SOURCE_PATH + "/5/6", true, null);
mkdirs(SOURCE_PATH + "/7");
mkdirs(SOURCE_PATH + "/7/8");
touchFile(SOURCE_PATH + "/7/8/9");
}
+ private static void createSourceDataWithDifferentChecksumType()
+ throws Exception {
+ mkdirs(SOURCE_PATH + "/1");
+ mkdirs(SOURCE_PATH + "/2");
+ mkdirs(SOURCE_PATH + "/2/3/4");
+ mkdirs(SOURCE_PATH + "/2/3");
+ mkdirs(SOURCE_PATH + "/5");
+ touchFile(SOURCE_PATH + "/5/6", new ChecksumOpt(DataChecksum.Type.CRC32,
+ 512));
+ mkdirs(SOURCE_PATH + "/7");
+ mkdirs(SOURCE_PATH + "/7/8");
+ touchFile(SOURCE_PATH + "/7/8/9", new ChecksumOpt(DataChecksum.Type.CRC32C,
+ 512));
+ }
+
private static void mkdirs(String path) throws Exception {
FileSystem fileSystem = cluster.getFileSystem();
final Path qualifiedPath = new
Path(path).makeQualified(fileSystem.getUri(),
@@ -134,21 +153,31 @@ public class TestCopyMapper {
}
private static void touchFile(String path) throws Exception {
- touchFile(path, false);
+ touchFile(path, false, null);
}
- private static void touchFile(String path, boolean createMultipleBlocks)
throws Exception {
- final long NON_DEFAULT_BLOCK_SIZE = 4096;
+ private static void touchFile(String path, ChecksumOpt checksumOpt)
+ throws Exception {
+ // create files with specific checksum opt and non-default block size
+ touchFile(path, true, checksumOpt);
+ }
+
+ private static void touchFile(String path, boolean createMultipleBlocks,
+ ChecksumOpt checksumOpt) throws Exception {
FileSystem fs;
DataOutputStream outputStream = null;
try {
fs = cluster.getFileSystem();
final Path qualifiedPath = new Path(path).makeQualified(fs.getUri(),
-
fs.getWorkingDirectory());
- final long blockSize = createMultipleBlocks? NON_DEFAULT_BLOCK_SIZE :
fs.getDefaultBlockSize(qualifiedPath) * 2;
- outputStream = fs.create(qualifiedPath, true, 0,
- (short)(fs.getDefaultReplication(qualifiedPath)*2),
- blockSize);
+ fs.getWorkingDirectory());
+ final long blockSize = createMultipleBlocks ? NON_DEFAULT_BLOCK_SIZE : fs
+ .getDefaultBlockSize(qualifiedPath) * 2;
+ FsPermission permission = FsPermission.getFileDefault().applyUMask(
+ FsPermission.getUMask(fs.getConf()));
+ outputStream = fs.create(qualifiedPath, permission,
+ EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 0,
+ (short) (fs.getDefaultReplication(qualifiedPath) * 2), blockSize,
+ null, checksumOpt);
byte[] bytes = new byte[DEFAULT_FILE_SIZE];
outputStream.write(bytes);
long fileSize = DEFAULT_FILE_SIZE;
@@ -171,17 +200,40 @@ public class TestCopyMapper {
}
}
+ @Test
+ public void testCopyWithDifferentChecksumType() throws Exception {
+ testCopy(true);
+ }
+
@Test(timeout=40000)
public void testRun() {
+ testCopy(false);
+ }
+
+ private void testCopy(boolean preserveChecksum) {
try {
deleteState();
- createSourceData();
+ if (preserveChecksum) {
+ createSourceDataWithDifferentChecksumType();
+ } else {
+ createSourceData();
+ }
FileSystem fs = cluster.getFileSystem();
CopyMapper copyMapper = new CopyMapper();
StubContext stubContext = new StubContext(getConfiguration(), null, 0);
Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext();
+
+ Configuration configuration = context.getConfiguration();
+ EnumSet<DistCpOptions.FileAttribute> fileAttributes
+ = EnumSet.of(DistCpOptions.FileAttribute.REPLICATION);
+ if (preserveChecksum) {
+ fileAttributes.add(DistCpOptions.FileAttribute.CHECKSUMTYPE);
+ }
+ configuration.set(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel(),
+ DistCpUtils.packAttributes(fileAttributes));
+
copyMapper.setup(context);
for (Path path: pathList) {
@@ -195,19 +247,29 @@ public class TestCopyMapper {
.replaceAll(SOURCE_PATH, TARGET_PATH));
Assert.assertTrue(fs.exists(targetPath));
Assert.assertTrue(fs.isFile(targetPath) == fs.isFile(path));
- Assert.assertEquals(fs.getFileStatus(path).getReplication(),
- fs.getFileStatus(targetPath).getReplication());
- Assert.assertEquals(fs.getFileStatus(path).getBlockSize(),
- fs.getFileStatus(targetPath).getBlockSize());
- Assert.assertTrue(!fs.isFile(targetPath) ||
- fs.getFileChecksum(targetPath).equals(
- fs.getFileChecksum(path)));
+ FileStatus sourceStatus = fs.getFileStatus(path);
+ FileStatus targetStatus = fs.getFileStatus(targetPath);
+ Assert.assertEquals(sourceStatus.getReplication(),
+ targetStatus.getReplication());
+ if (preserveChecksum) {
+ Assert.assertEquals(sourceStatus.getBlockSize(),
+ targetStatus.getBlockSize());
+ }
+ Assert.assertTrue(!fs.isFile(targetPath)
+ ||
fs.getFileChecksum(targetPath).equals(fs.getFileChecksum(path)));
}
Assert.assertEquals(pathList.size(),
stubContext.getReporter().getCounter(CopyMapper.Counter.COPY).getValue());
- Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE,
-
stubContext.getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED).getValue());
+ if (!preserveChecksum) {
+ Assert.assertEquals(nFiles * DEFAULT_FILE_SIZE, stubContext
+ .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+ .getValue());
+ } else {
+ Assert.assertEquals(nFiles * NON_DEFAULT_BLOCK_SIZE * 2, stubContext
+ .getReporter().getCounter(CopyMapper.Counter.BYTESCOPIED)
+ .getValue());
+ }
testCopyingExistingFiles(fs, copyMapper, context);
for (Text value : stubContext.getWriter().values()) {
@@ -309,7 +371,7 @@ public class TestCopyMapper {
UserGroupInformation tmpUser =
UserGroupInformation.createRemoteUser("guest");
final CopyMapper copyMapper = new CopyMapper();
-
+
final Mapper<Text, FileStatus, Text, Text>.Context context = tmpUser.
doAs(new PrivilegedAction<Mapper<Text, FileStatus, Text,
Text>.Context>() {
@Override
@@ -535,7 +597,7 @@ public class TestCopyMapper {
final Mapper<Text, FileStatus, Text, Text>.Context context
= stubContext.getContext();
-
+
context.getConfiguration().set(DistCpConstants.CONF_LABEL_PRESERVE_STATUS,
DistCpUtils.packAttributes(preserveStatus));