Author: atm
Date: Tue Jan 7 00:24:09 2014
New Revision: 1556082
URL: http://svn.apache.org/r1556082
Log:
HDFS-5685. DistCp will fail to copy with -delete switch. Contributed by Yongjun
Zhang.
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1556082&r1=1556081&r2=1556082&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Tue Jan 7 00:24:09 2014
@@ -181,6 +181,9 @@ Release 1.3.0 - unreleased
MAPREDUCE-5698. Backport MAPREDUCE-1285 to branch-1 (Yongjun Zhang via
Sandy Ryza)
+ HDFS-5685. DistCp will fail to copy with -delete switch. (Yongjun Zhang
+ via atm)
+
Release 1.2.2 - unreleased
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=1556082&r1=1556081&r2=1556082&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java
(original)
+++
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/fs/TestCopyFiles.java
Tue Jan 7 00:24:09 2014
@@ -49,6 +49,10 @@ import org.apache.log4j.Level;
* A JUnit test for copying files recursively.
*/
public class TestCopyFiles extends TestCase {
+
+ private static final String JT_STAGING_AREA_ROOT =
"mapreduce.jobtracker.staging.root.dir";
+ private static final String JT_STAGING_AREA_ROOT_DEFAULT =
"/tmp/hadoop/mapred/staging";
+
{
((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange")
).getLogger().setLevel(Level.OFF);
@@ -56,8 +60,9 @@ public class TestCopyFiles extends TestC
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
((Log4JLogger)DistCp.LOG).getLogger().setLevel(Level.ALL);
}
-
- static final URI LOCAL_FS = URI.create("file:///");
+
+ private static final String LOCAL_FS_STR = "file:///";
+ private static final URI LOCAL_FS_URI = URI.create(LOCAL_FS_STR);
private static final Random RAN = new Random();
private static final int NFILES = 20;
@@ -255,11 +260,11 @@ public class TestCopyFiles extends TestC
/** copy files from local file system to local file system */
public void testCopyFromLocalToLocal() throws Exception {
Configuration conf = new Configuration();
- FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
- MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
+ FileSystem localfs = FileSystem.get(LOCAL_FS_URI, conf);
+ MyFile[] files = createFiles(LOCAL_FS_URI, TEST_ROOT_DIR+"/srcdat");
ToolRunner.run(new DistCp(new Configuration()),
- new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
- "file:///"+TEST_ROOT_DIR+"/destdat"});
+ new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat",
+
LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat"});
assertTrue("Source and destination directories do not match.",
checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files));
deldir(localfs, TEST_ROOT_DIR+"/destdat");
@@ -305,11 +310,11 @@ public class TestCopyFiles extends TestC
final FileSystem hdfs = cluster.getFileSystem();
final String namenode = hdfs.getUri().toString();
if (namenode.startsWith("hdfs://")) {
- MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
+ MyFile[] files = createFiles(LOCAL_FS_URI, TEST_ROOT_DIR+"/srcdat");
ToolRunner.run(new DistCp(conf), new String[] {
"-log",
namenode+"/logs",
- "file:///"+TEST_ROOT_DIR+"/srcdat",
+ LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat",
namenode+"/destdat"});
assertTrue("Source and destination directories do not match.",
checkFiles(cluster.getFileSystem(), "/destdat", files));
@@ -317,7 +322,7 @@ public class TestCopyFiles extends TestC
hdfs.exists(new Path(namenode+"/logs")));
deldir(hdfs, "/destdat");
deldir(hdfs, "/logs");
- deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat");
+ deldir(FileSystem.get(LOCAL_FS_URI, conf), TEST_ROOT_DIR+"/srcdat");
}
} finally {
if (cluster != null) { cluster.shutdown(); }
@@ -329,7 +334,7 @@ public class TestCopyFiles extends TestC
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
- final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
+ final FileSystem localfs = FileSystem.get(LOCAL_FS_URI, conf);
cluster = new MiniDFSCluster(conf, 1, true, null);
final FileSystem hdfs = cluster.getFileSystem();
final String namenode = FileSystem.getDefaultUri(conf).toString();
@@ -339,7 +344,7 @@ public class TestCopyFiles extends TestC
"-log",
"/logs",
namenode+"/srcdat",
- "file:///"+TEST_ROOT_DIR+"/destdat"});
+
LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat"});
assertTrue("Source and destination directories do not match.",
checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files));
assertTrue("Log directory does not exist.",
@@ -413,7 +418,7 @@ public class TestCopyFiles extends TestC
}
}
- public void testCopyDfsToDfsUpdateWithSkipCRC() throws Exception {
+ public void testCopyDfsToDfsUpdateWithSkipCRC() throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
@@ -488,21 +493,119 @@ public class TestCopyFiles extends TestC
}
}
+ /**
+ * A helper function to test copying files between local file system and dfs
+ * file system, with staging area set to local file system.
+ */
+ private void stagingAreaTest(final FileSystem srcFs, final FileSystem destFs,
+ MiniDFSCluster cluster, Configuration conf) throws Exception {
+ try {
+ final String fileDir = "/files";
+ final String srcParent = "/srcdat";
+ final String destParent = "/destdata";
+ final String source = srcParent + fileDir;
+ final String destination = destParent + fileDir;
+ final String logs = "/logs";
+ String logDir = TEST_ROOT_DIR + logs;
+
+ URI srcUri = srcFs.getUri();
+ URI destUri = destFs.getUri();
+
+ final boolean isSrcLocalFs =
srcUri.getScheme().equals(LOCAL_FS_URI.getScheme());
+
+ final FileSystem localFs = FileSystem.get(LOCAL_FS_URI, conf);
+ String prevStagingArea =
+ conf.get(JT_STAGING_AREA_ROOT, JT_STAGING_AREA_ROOT_DEFAULT);
+ String newStagingArea = (isSrcLocalFs? source : destination);
+ newStagingArea += "/STAGING";
+ conf.set(JT_STAGING_AREA_ROOT, TEST_ROOT_DIR + newStagingArea);
+
+ final String srcParentPrefix = isSrcLocalFs? TEST_ROOT_DIR : "";
+ final String destParentPrefix = isSrcLocalFs? "" : TEST_ROOT_DIR;
+
+ String createDelSrcParent = srcParentPrefix + srcParent;
+ String createDelDestParent = destParentPrefix + destParent;
+ String createDelSrc = createDelSrcParent + fileDir;
+ String createDelDest = createDelDestParent + fileDir;
+
+ MyFile[] srcFiles = createFiles(srcUri, createDelSrc);
+ createFiles(destUri, createDelDest);
+
+ String distcpSrc = String.valueOf(srcUri) + createDelSrc;
+ String distcpDest = String.valueOf(destUri) + createDelDest;
+
+ ToolRunner.run(new DistCp(conf), new String[] {
+ "-log",
+ LOCAL_FS_STR + logDir,
+ "-update",
+ "-delete",
+ distcpSrc,
+ distcpDest});
+
+ assertTrue("Source and destination directories do not match.",
+ checkFiles(destFs, createDelDest, srcFiles));
+
+ deldir(localFs, logDir);
+ deldir(srcFs, createDelSrcParent);
+ deldir(destFs, createDelDestParent);
+
+ conf.set(JT_STAGING_AREA_ROOT, prevStagingArea);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /**
+ * test copying files from local file system to dfs file system with staging
+ * area in src
+ */
+ public void testCopyFromLocalToDfsWithStagingAreaInSrc() throws Exception {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+
+ String namenode = FileSystem.getDefaultUri(conf).toString();
+ assertTrue("Name node doesn't start with hdfs://",
namenode.startsWith("hdfs://"));
+
+ final FileSystem srcFs = FileSystem.get(LOCAL_FS_URI, conf);
+ final FileSystem destFs = cluster.getFileSystem();
+
+ stagingAreaTest(srcFs, destFs, cluster, conf);
+ }
+
+ /**
+ * test copying files from dfs file system to local file system with staging
+ * area in dest
+ */
+ public void testCopyFromDfsToLocalWithStagingAreaInDest() throws Exception {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+
+ String namenode = FileSystem.getDefaultUri(conf).toString();
+ assertTrue("Name node doesn't start with hdfs://",
namenode.startsWith("hdfs://"));
+
+ final FileSystem srcFs = cluster.getFileSystem();
+ final FileSystem destFs = FileSystem.get(LOCAL_FS_URI, conf);
+
+ stagingAreaTest(srcFs, destFs, cluster, conf);
+ }
+
public void testCopyDuplication() throws Exception {
- final FileSystem localfs = FileSystem.get(LOCAL_FS, new Configuration());
+ final FileSystem localfs = FileSystem.get(LOCAL_FS_URI, new
Configuration());
try {
MyFile[] files = createFiles(localfs, TEST_ROOT_DIR+"/srcdat");
ToolRunner.run(new DistCp(new Configuration()),
- new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
- "file:///"+TEST_ROOT_DIR+"/src2/srcdat"});
+ new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat",
+ LOCAL_FS_STR+TEST_ROOT_DIR+"/src2/srcdat"});
assertTrue("Source and destination directories do not match.",
checkFiles(localfs, TEST_ROOT_DIR+"/src2/srcdat", files));
assertEquals(DistCp.DuplicationException.ERROR_CODE,
ToolRunner.run(new DistCp(new Configuration()),
- new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
- "file:///"+TEST_ROOT_DIR+"/src2/srcdat",
- "file:///"+TEST_ROOT_DIR+"/destdat",}));
+ new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat",
+ LOCAL_FS_STR+TEST_ROOT_DIR+"/src2/srcdat",
+ LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat",}));
}
finally {
deldir(localfs, TEST_ROOT_DIR+"/destdat");
@@ -512,14 +615,14 @@ public class TestCopyFiles extends TestC
}
public void testCopySingleFile() throws Exception {
- FileSystem fs = FileSystem.get(LOCAL_FS, new Configuration());
+ FileSystem fs = FileSystem.get(LOCAL_FS_URI, new Configuration());
Path root = new Path(TEST_ROOT_DIR+"/srcdat");
try {
MyFile[] files = {createFile(root, fs)};
//copy a dir with a single file
ToolRunner.run(new DistCp(new Configuration()),
- new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
- "file:///"+TEST_ROOT_DIR+"/destdat"});
+ new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat",
+ LOCAL_FS_STR+TEST_ROOT_DIR+"/destdat"});
assertTrue("Source and destination directories do not match.",
checkFiles(fs, TEST_ROOT_DIR+"/destdat", files));
@@ -528,8 +631,8 @@ public class TestCopyFiles extends TestC
Path p = new Path(root, fname);
FileSystem.LOG.info("fname=" + fname + ", exists? " + fs.exists(p));
ToolRunner.run(new DistCp(new Configuration()),
- new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat/"+fname,
- "file:///"+TEST_ROOT_DIR+"/dest2/"+fname});
+ new String[] {LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+fname,
+ LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"+fname});
assertTrue("Source and destination directories do not match.",
checkFiles(fs, TEST_ROOT_DIR+"/dest2", files));
//copy single file to existing dir
@@ -539,16 +642,16 @@ public class TestCopyFiles extends TestC
String sname = files2[0].getName();
ToolRunner.run(new DistCp(new Configuration()),
new String[] {"-update",
- "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname,
- "file:///"+TEST_ROOT_DIR+"/dest2/"});
+ LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+sname,
+ LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"});
assertTrue("Source and destination directories do not match.",
checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2));
updateFiles(fs, TEST_ROOT_DIR+"/srcdat", files2, 1);
//copy single file to existing dir w/ dst name conflict
ToolRunner.run(new DistCp(new Configuration()),
new String[] {"-update",
- "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname,
- "file:///"+TEST_ROOT_DIR+"/dest2/"});
+ LOCAL_FS_STR+TEST_ROOT_DIR+"/srcdat/"+sname,
+ LOCAL_FS_STR+TEST_ROOT_DIR+"/dest2/"});
assertTrue("Source and destination directories do not match.",
checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2));
}
@@ -917,7 +1020,7 @@ public class TestCopyFiles extends TestC
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
- final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
+ final FileSystem localfs = FileSystem.get(LOCAL_FS_URI, conf);
cluster = new MiniDFSCluster(conf, 1, true, null);
final FileSystem hdfs = cluster.getFileSystem();
final String namenode = FileSystem.getDefaultUri(conf).toString();
@@ -927,7 +1030,7 @@ public class TestCopyFiles extends TestC
MyFile[] localFiles = createFiles(localfs, destdir);
ToolRunner.run(new DistCp(conf), new String[] { "-delete", "-update",
"-log", "/logs", namenode + "/srcdat",
- "file:///" + TEST_ROOT_DIR + "/destdat" });
+ LOCAL_FS_STR + TEST_ROOT_DIR + "/destdat" });
assertTrue("Source and destination directories do not match.",
checkFiles(localfs, destdir, files));
assertTrue("Log directory does not exist.",
Modified:
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java?rev=1556082&r1=1556081&r2=1556082&view=diff
==============================================================================
---
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java
(original)
+++
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/DistCp.java
Tue Jan 7 00:24:09 2014
@@ -1091,14 +1091,21 @@ public class DistCp implements Tool {
(args.srcs.size() == 1 && !dstExists) || update || overwrite;
int srcCount = 0, cnsyncf = 0, dirsyn = 0;
long fileCount = 0L, byteCount = 0L, cbsyncs = 0L;
+ final FileStatus jobDirStat = jobfs.getFileStatus(jobDirectory);
try {
for(Iterator<Path> srcItr = args.srcs.iterator(); srcItr.hasNext(); ) {
final Path src = srcItr.next();
FileSystem srcfs = src.getFileSystem(conf);
FileStatus srcfilestat = srcfs.getFileStatus(src);
Path root = special && srcfilestat.isDir()? src: src.getParent();
+ final boolean needToFilterJobDir = srcfs.equals(jobfs);
+
if (srcfilestat.isDir()) {
- ++srcCount;
+ if (needToFilterJobDir && (srcfilestat.compareTo(jobDirStat) == 0)) {
+ continue;
+ } else {
+ ++srcCount;
+ }
}
Stack<FileStatus> pathstack = new Stack<FileStatus>();
@@ -1109,12 +1116,17 @@ public class DistCp implements Tool {
boolean skipfile = false;
final FileStatus child = children[i];
final String dst = makeRelative(root, child.getPath());
- ++srcCount;
if (child.isDir()) {
- pathstack.push(child);
+ if (needToFilterJobDir && (child.compareTo(jobDirStat) == 0)) {
+ continue;
+ } else {
+ ++srcCount;
+ pathstack.push(child);
+ }
}
else {
+ ++srcCount;
//skip file if the src and the dst files are the same.
skipfile = update &&
sameFile(srcfs, child, dstfs,
@@ -1272,17 +1284,22 @@ public class DistCp implements Tool {
+ ") is not a directory.");
}
- //write dst lsr results
+ // write dst lsr results
+ final boolean needToFilterJobDir = dstfs.equals(jobfs);
+ final FileStatus jobDirStat = jobfs.getFileStatus(jobdir);
final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs,
jobconf,
dstlsr, Text.class, dstroot.getClass(),
SequenceFile.CompressionType.NONE);
try {
- //do lsr to get all file statuses in dstroot
+ // do lsr to get all file statuses in dstroot
final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
final FileStatus status = lsrstack.pop();
if (status.isDir()) {
+ if (needToFilterJobDir && (status.compareTo(jobDirStat) == 0)) {
+ continue;
+ }
for(FileStatus child : dstfs.listStatus(status.getPath())) {
String relative = makeRelative(dstroot.getPath(), child.getPath());
writer.append(new Text(relative), child);
@@ -1294,20 +1311,21 @@ public class DistCp implements Tool {
checkAndClose(writer);
}
- //sort lsr results
+ // sort lsr results
final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted");
SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs,
new Text.Comparator(), Text.class, FileStatus.class, jobconf);
sorter.sort(dstlsr, sortedlsr);
- //compare lsr list and dst list
+ // compare lsr list and dst list
+ final String jobDirStr = jobdir.toString();
SequenceFile.Reader lsrin = null;
SequenceFile.Reader dstin = null;
try {
lsrin = new SequenceFile.Reader(jobfs, sortedlsr, jobconf);
dstin = new SequenceFile.Reader(jobfs, dstsorted, jobconf);
- //compare sorted lsr list and sorted dst list
+ // compare sorted lsr list and sorted dst list
final Text lsrpath = new Text();
final FileStatus lsrstatus = new FileStatus();
final Text dstpath = new Text();
@@ -1317,6 +1335,12 @@ public class DistCp implements Tool {
boolean hasnext = dstin.next(dstpath, dstfrom);
for(; lsrin.next(lsrpath, lsrstatus); ) {
+ //
+ // check if lsrpath is in dst (represented here by dstsorted, which
+ // contains files and dirs to be copied from the source to
destination),
+ // delete it if it doesn't exist in dst AND it's not jobDir or jobDir's
+ // ancestor.
+ //
int dst_cmp_lsr = dstpath.compareTo(lsrpath);
for(; hasnext && dst_cmp_lsr < 0; ) {
hasnext = dstin.next(dstpath, dstfrom);
@@ -1324,12 +1348,22 @@ public class DistCp implements Tool {
}
if (dst_cmp_lsr == 0) {
- //lsrpath exists in dst, skip it
+ // lsrpath exists in dst, skip it
hasnext = dstin.next(dstpath, dstfrom);
}
else {
- //lsrpath does not exist, delete it
+ // lsrpath does not exist in dst, delete it if it's not jobDir or
+ // jobDir's ancestor
String s = new Path(dstroot.getPath(),
lsrpath.toString()).toString();
+ if (needToFilterJobDir) {
+ int cmpJobDir = s.compareTo(jobDirStr);
+ if (cmpJobDir > 0) {
+ // do nothing
+ } else if (cmpJobDir == 0 || isAncestorPath(s, jobDirStr)) {
+ continue;
+ }
+ }
+
if (shellargs[1] == null || !isAncestorPath(shellargs[1], s)) {
shellargs[1] = s;
int r = 0;