Author: cdouglas
Date: Tue Feb 12 14:01:01 2008
New Revision: 627143
URL: http://svn.apache.org/viewvc?rev=627143&view=rev
Log:
HADOOP-2725. Modify distcp to avoid leaving partially copied files at
the destination after encountering an error.
Contributed by Tsz Wo (Nicholas), SZE
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=627143&r1=627142&r2=627143&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Feb 12 14:01:01 2008
@@ -63,6 +63,10 @@
HADOOP-2733. Fix compiler warnings in test code.
(Tsz Wo (Nicholas), SZE via cdouglas)
+ HADOOP-2725. Modify distcp to avoid leaving partially copied files at
+ the destination after encountering an error. (Tsz Wo (Nicholas), SZE
+ via cdouglas)
+
Release 0.16.0 - 2008-02-07
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?rev=627143&r1=627142&r2=627143&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Tue Feb
12 14:01:01 2008
@@ -757,7 +757,7 @@
}
/** Write key/value pairs to a sequence-format file. */
- public static class Writer {
+ public static class Writer implements java.io.Closeable {
/**
* A global compressor pool used to save the expensive
* construction/destruction of (possibly native) compression codecs.
@@ -1316,7 +1316,7 @@
} // BlockCompressionWriter
/** Reads key/value pairs from a sequence-format file. */
- public static class Reader {
+ public static class Reader implements java.io.Closeable {
/**
* A global decompressor pool used to save the expensive
* construction/destruction of (possibly native) decompression codecs.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?rev=627143&r1=627142&r2=627143&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Tue Feb 12
14:01:01 2008
@@ -25,17 +25,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Stack;
-import java.util.StringTokenizer;
+import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -70,8 +60,10 @@
public class CopyFiles implements Tool {
private static final Log LOG = LogFactory.getLog(CopyFiles.class);
- private static final String usage =
- "distcp [OPTIONS] <srcurl>* <desturl>" +
+ private static final String NAME = "distcp";
+
+ private static final String usage = NAME
+ + " [OPTIONS] <srcurl>* <desturl>" +
"\n\nOPTIONS:" +
"\n-p Preserve status" +
"\n-i Ignore failures" +
@@ -82,7 +74,7 @@
"\n\nNOTE: if -overwrite or -update are set, each source URI is " +
"\n interpreted as an isomorphic update to an existing directory." +
"\nFor example:" +
- "\nhadoop distcp -p -update \"hdfs://A:8020/user/foo/bar\" " +
+ "\nhadoop " + NAME + " -p -update \"hdfs://A:8020/user/foo/bar\" " +
"\"hdfs://B:8020/user/foo/baz\"\n" +
"\n would update all descendants of 'baz' also in 'bar'; it would " +
"\n *not* update /user/foo/baz/bar\n";
@@ -92,6 +84,26 @@
private static final int SYNC_FILE_MAX = 10;
static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
+ static enum Options {
+ IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
+ PRESERVE_STATUS("-p", NAME + ".preserve.status.info"),
+ OVERWRITE("-overwrite", NAME + ".overwrite.always"),
+ UPDATE("-update", NAME + ".overwrite.ifnewer");
+
+ final String cmd, propertyname;
+
+ private Options(String cmd, String propertyname) {
+ this.cmd = cmd;
+ this.propertyname = propertyname;
+ }
+ }
+
+ static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
+ static final String DST_DIR_LABEL = NAME + ".dest.path";
+ static final String JOB_DIR_LABEL = NAME + ".job.dir";
+ static final String SRC_LIST_LABEL = NAME + ".src.list";
+ static final String SRC_COUNT_LABEL = NAME + ".src.count";
+ static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
private JobConf conf;
@@ -107,9 +119,6 @@
return conf;
}
- @Deprecated
- public CopyFiles() { }
-
public CopyFiles(Configuration conf) {
setConf(conf);
}
@@ -139,7 +148,7 @@
* InputFormat of a distcp job responsible for generating splits of the src
* file list.
*/
- static class CopyInputFormat implements InputFormat {
+ static class CopyInputFormat implements InputFormat<Text, Text> {
/**
* Does nothing.
@@ -154,9 +163,9 @@
*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
- int cnfiles = job.getInt("distcp.file.count", -1);
- long cbsize = job.getLong("distcp.total.size", -1);
- String srcfilelist = job.get("distcp.src.list", "");
+ int cnfiles = job.getInt(SRC_COUNT_LABEL, -1);
+ long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1);
+ String srcfilelist = job.get(SRC_LIST_LABEL, "");
if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) {
throw new RuntimeException("Invalid metadata: #files(" + cnfiles +
") total_size(" + cbsize + ") listuri(" +
@@ -197,23 +206,13 @@
/**
* Returns a reader for this split of the src file list.
*/
- public RecordReader getRecordReader(InputSplit split, JobConf job,
- Reporter reporter) throws IOException {
- return new SequenceFileRecordReader(job, (FileSplit)split);
+ public RecordReader<Text, Text> getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter) throws IOException {
+ return new SequenceFileRecordReader<Text, Text>(job, (FileSplit)split);
}
}
/**
- * Return true if dst should be replaced by src and the update flag is set.
- * Right now, this merely checks that the src and dst len are not equal. This
- * should be improved on once modification times, CRCs, etc. can
- * be meaningful in this context.
- */
- private static boolean needsUpdate(FileStatus src, FileStatus dst) {
- return src.getLen() != dst.getLen();
- }
-
- /**
* FSCopyFilesMapper: The mapper for copying files between FileSystems.
*/
public static class FSCopyFilesMapper
@@ -240,6 +239,15 @@
" Failed: " + failcount);
}
+ /**
+ * Return true if dst should be replaced by src and the update flag is set.
+ * Right now, this merely checks that the src and dst len are not equal.
+ * This should be improved on once modification times, CRCs, etc. can
+ * be meaningful in this context.
+ */
+ private boolean needsUpdate(FileStatus src, FileStatus dst) {
+ return update && src.getLen() != dst.getLen();
+ }
/**
* Copy a file to a destination.
@@ -247,17 +255,22 @@
* @param dstpath dst path
* @param reporter
*/
- private void copy(FileStatus srcstat, Path dstpath,
+ private void copy(FileStatus srcstat, Path relativedst,
OutputCollector<WritableComparable, Text> outc, Reporter reporter)
throws IOException {
-
- int totfiles = job.getInt("distcp.file.count", -1);
+ Path absdst = new Path(destPath, relativedst);
+ int totfiles = job.getInt(SRC_COUNT_LABEL, -1);
assert totfiles >= 0 : "Invalid file count " + totfiles;
// if a directory, ensure created even if empty
if (srcstat.isDir()) {
- if (!destFileSys.mkdirs(dstpath)) {
- throw new IOException("Failed to create" + dstpath);
+ if (destFileSys.exists(absdst)) {
+ if (!destFileSys.getFileStatus(absdst).isDir()) {
+ throw new IOException("Failed to mkdirs: " + absdst+" is a file.");
+ }
+ }
+ else if (!destFileSys.mkdirs(absdst)) {
+ throw new IOException("Failed to mkdirs " + absdst);
}
// TODO: when modification times can be set, directories should be
// emitted to reducers so they might be preserved. Also, mkdirs does
@@ -265,70 +278,87 @@
// if this changes, all directory work might as well be done in reduce
return;
}
- Path destParent = dstpath.getParent();
- if (totfiles > 1) {
- // create directories to hold destination file
- if (destParent != null && !destFileSys.mkdirs(destParent)) {
- throw new IOException("mkdirs failed to create " + destParent);
- }
- } else {
- // Copying a single file; use dst path provided by user as destination
- // rather than destination directory
- dstpath = destParent;
+
+ if (destFileSys.exists(absdst) && !overwrite
+ && !needsUpdate(srcstat, destFileSys.getFileStatus(absdst))) {
+ outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
+ ++skipcount;
+ reporter.incrCounter(Counter.SKIP, 1);
+ updateStatus(reporter);
+ return;
}
+ Path tmpfile = new Path(job.get(TMP_DIR_LABEL), relativedst);
long cbcopied = 0L;
FSDataInputStream in = null;
FSDataOutputStream out = null;
try {
- if (destFileSys.exists(dstpath)
- && (!overwrite && !(update
- && needsUpdate(srcstat, destFileSys.getFileStatus(dstpath))))) {
- outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
- ++skipcount;
- reporter.incrCounter(Counter.SKIP, 1);
- updateStatus(reporter);
- return;
- }
// open src file
in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
- final long cblen = srcstat.getLen();
- reporter.incrCounter(Counter.BYTESEXPECTED, cblen);
- // open dst file
+ reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
+ // open tmp file
out = preserve_status
- ? destFileSys.create(dstpath, true, sizeBuf,
srcstat.getReplication(),
+ ? destFileSys.create(tmpfile, true, sizeBuf,
srcstat.getReplication(),
srcstat.getBlockSize(), reporter)
- : destFileSys.create(dstpath, reporter);
+ : destFileSys.create(tmpfile, reporter);
// copy file
int cbread;
while ((cbread = in.read(buffer)) >= 0) {
out.write(buffer, 0, cbread);
cbcopied += cbread;
- reporter.setStatus(pcntfmt.format(100.0 * cbcopied / cblen) +
- " " + dstpath + " [ " +
+ reporter.setStatus(pcntfmt.format(100.0 * cbcopied /
srcstat.getLen())
+ + " " + absdst + " [ " +
StringUtils.humanReadableInt(cbcopied) + " / " +
- StringUtils.humanReadableInt(cblen) + " ]");
- }
- if (cbcopied != cblen) {
- final String badlen = "ERROR? copied " + cbcopied + " bytes (" +
- StringUtils.humanReadableInt(cbcopied) + ") expected " +
- cblen + " bytes (" + StringUtils.humanReadableInt(cblen) +
- ") from " + srcstat.getPath();
- LOG.warn(badlen);
- outc.collect(null, new Text(badlen));
+ StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
}
} finally {
- if (in != null)
- in.close();
- if (out != null)
- out.close();
+ checkAndClose(in);
+ checkAndClose(out);
}
+
+ final boolean success = cbcopied == srcstat.getLen();
+ if (!success) {
+ final String badlen = "ERROR? copied " + bytesString(cbcopied)
+ + " but expected " + bytesString(srcstat.getLen())
+ + " from " + srcstat.getPath();
+ LOG.warn(badlen);
+ outc.collect(null, new Text(badlen));
+ }
+ else {
+ if (totfiles == 1) {
+ // Copying a single file; use dst path provided by user as
destination
+ // rather than destination directory
+ absdst = absdst.getParent();
+ }
+ rename(destFileSys, tmpfile, absdst);
+ }
+
// report at least once for each file
++copycount;
reporter.incrCounter(Counter.BYTESCOPIED, cbcopied);
reporter.incrCounter(Counter.COPY, 1);
updateStatus(reporter);
}
+
+ /** rename tmp to dst, delete dst if already exists */
+ private void rename(FileSystem fs, Path tmp, Path dst) throws IOException {
+ try {
+ if (fs.exists(dst)) {
+ fs.delete(dst);
+ }
+ fs.rename(tmp, dst);
+ }
+ catch(IOException cause) {
+ IOException ioe = new IOException("Fail to rename tmp file (=" + tmp
+ + ") to destination file (=" + dst + ")");
+ ioe.initCause(cause);
+ throw ioe;
+ }
+ }
+
+ static String bytesString(long b) {
+ return b + " bytes (" + StringUtils.humanReadableInt(b) + ")";
+ }
/** Mapper configuration.
* Extracts source and destination file system, as well as
@@ -337,7 +367,7 @@
*/
public void configure(JobConf job)
{
- destPath = new Path(job.get("copy.dest.path", "/"));
+ destPath = new Path(job.get(DST_DIR_LABEL, "/"));
try {
destFileSys = destPath.getFileSystem(job);
} catch (IOException ex) {
@@ -345,10 +375,10 @@
}
sizeBuf = job.getInt("copy.buf.size", 128 * 1024);
buffer = new byte[sizeBuf];
- ignoreReadFailures = job.getBoolean("distcp.ignore.read.failures",
false);
- preserve_status = job.getBoolean("distcp.preserve.status.info", false);
- update = job.getBoolean("distcp.overwrite.ifnewer", false);
- overwrite = !update && job.getBoolean("distcp.overwrite.always", false);
+ ignoreReadFailures =
job.getBoolean(Options.IGNORE_READ_FAILURES.propertyname, false);
+ preserve_status = job.getBoolean(Options.PRESERVE_STATUS.propertyname,
false);
+ update = job.getBoolean(Options.UPDATE.propertyname, false);
+ overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname,
false);
this.job = job;
}
@@ -438,27 +468,15 @@
} else {
tmp.add(src);
}
- EnumSet<cpOpts> flags = ignoreReadFailures
- ? EnumSet.of(cpOpts.IGNORE_READ_FAILURES)
- : EnumSet.noneOf(cpOpts.class);
+ EnumSet<Options> flags = ignoreReadFailures
+ ? EnumSet.of(Options.IGNORE_READ_FAILURES)
+ : EnumSet.noneOf(Options.class);
copy(conf, tmp, new Path(destPath), logPath, flags);
}
- /**
- * Driver to copy srcPath to destPath depending on required protocol.
- * @param srcPaths list of source paths
- * @param destPath Destination path
- * @param logPath Log output directory
- * @param flags Command-line flags
- */
- public static void copy(Configuration conf, List<Path> srcPaths,
- Path destPath, Path logPath,
- EnumSet<cpOpts> flags) throws IOException {
- //Job configuration
- JobConf job = new JobConf(conf, CopyFiles.class);
- job.setJobName("distcp");
-
- //Sanity check for srcPath/destPath
+ /** Sanity check for srcPath */
+ private static void checkSrcPath(Configuration conf, List<Path> srcPaths
+ ) throws IOException {
List<IOException> rslt = new ArrayList<IOException>();
for (Path p : srcPaths) {
FileSystem fs = p.getFileSystem(conf);
@@ -469,21 +487,35 @@
if (!rslt.isEmpty()) {
throw new InvalidInputException(rslt);
}
+ }
+ /**
+ * Driver to copy srcPath to destPath depending on required protocol.
+ * @param srcPaths list of source paths
+ * @param destPath Destination path
+ * @param logPath Log output directory
+ * @param flags Command-line flags
+ */
+ public static void copy(Configuration conf, List<Path> srcPaths,
+ Path destPath, Path logPath,
+ EnumSet<Options> flags) throws IOException {
+ LOG.info("srcPaths=" + srcPaths);
+ LOG.info("destPath=" + destPath);
+ checkSrcPath(conf, srcPaths);
+
+ JobConf job = createJobConf(conf);
//Initialize the mapper
try {
setup(conf, job, srcPaths, destPath, logPath, flags);
JobClient.runJob(job);
} finally {
- cleanup(conf, job);
+ //delete tmp
+ fullyDelete(job.get(TMP_DIR_LABEL), job);
+ //delete jobDirectory
+ fullyDelete(job.get(JOB_DIR_LABEL), job);
}
}
- enum cpOpts { IGNORE_READ_FAILURES,
- PRESERVE_STATUS,
- OVERWRITE,
- UPDATE }
-
/**
* This is the main driver for recursively copying directories
* across file systems. It takes at least two cmdline parameters. A source
@@ -496,25 +528,23 @@
List<Path> srcPath = new ArrayList<Path>();
Path destPath = null;
Path logPath = null;
- EnumSet<cpOpts> flags = EnumSet.noneOf(cpOpts.class);
+ EnumSet<Options> flags = EnumSet.noneOf(Options.class);
for (int idx = 0; idx < args.length; idx++) {
- if ("-i".equals(args[idx])) {
- flags.add(cpOpts.IGNORE_READ_FAILURES);
- } else if ("-p".equals(args[idx])) {
- flags.add(cpOpts.PRESERVE_STATUS);
- } else if ("-overwrite".equals(args[idx])) {
- flags.add(cpOpts.OVERWRITE);
- } else if ("-update".equals(args[idx])) {
- flags.add(cpOpts.UPDATE);
- } else if ("-f".equals(args[idx])) {
+ Options[] opt = Options.values();
+ int i = 0;
+ for(; i < opt.length && !opt[i].cmd.equals(args[idx]); i++);
+
+ if (i < opt.length) {
+ flags.add(opt[i]);
+ }
+ else if ("-f".equals(args[idx])) {
if (++idx == args.length) {
System.out.println("urilist_uri not specified");
System.out.println(usage);
return -1;
}
srcPath.addAll(fetchFileList(conf, new Path(args[idx])));
-
} else if ("-log".equals(args[idx])) {
if (++idx == args.length) {
System.out.println("logdir not specified");
@@ -541,13 +571,16 @@
return -1;
}
// incompatible command-line flags
- if (flags.contains(cpOpts.OVERWRITE) && flags.contains(cpOpts.UPDATE)) {
+ if (flags.contains(Options.OVERWRITE) && flags.contains(Options.UPDATE)) {
System.out.println("Conflicting overwrite policies");
System.out.println(usage);
return -1;
}
try {
copy(conf, srcPath, destPath, logPath, flags);
+ } catch (DuplicationException e) {
+ System.err.println(StringUtils.stringifyException(e));
+ return DuplicationException.ERROR_CODE;
} catch (Exception e) {
System.err.println("With failures, global counters are inaccurate; " +
"consider running with -i");
@@ -607,22 +640,50 @@
numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
return Math.max(numMaps, 1);
}
- /**
- * Delete the temporary dir containing the src file list.
- * @param conf The dfs/mapred configuration
- * @param jobConf The handle to the jobConf object
- */
- private static void cleanup(Configuration conf, JobConf jobConf)
- throws IOException {
- //Clean up jobDirectory
- String jobDirName = jobConf.get("distdp.job.dir");
- if (jobDirName != null) {
- Path jobDirectory = new Path(jobDirName);
- FileSystem fs = jobDirectory.getFileSystem(jobConf);
- FileUtil.fullyDelete(fs, jobDirectory);
+
+ /** Fully delete dir */
+ static void fullyDelete(String dir, Configuration conf) throws IOException {
+ if (dir != null) {
+ Path tmp = new Path(dir);
+ FileUtil.fullyDelete(tmp.getFileSystem(conf), tmp);
}
}
+ //Job configuration
+ private static JobConf createJobConf(Configuration conf) {
+ JobConf jobconf = new JobConf(conf, CopyFiles.class);
+ jobconf.setJobName(NAME);
+
+ // turn off speculative execution, because DFS doesn't handle
+ // multiple writers to the same file.
+ jobconf.setMapSpeculativeExecution(false);
+
+ jobconf.setInputFormat(CopyInputFormat.class);
+ jobconf.setOutputKeyClass(Text.class);
+ jobconf.setOutputValueClass(Text.class);
+
+ jobconf.setMapperClass(FSCopyFilesMapper.class);
+ jobconf.setNumReduceTasks(0);
+ return jobconf;
+ }
+
+ private static final Random RANDOM = new Random();
+ private static String getRandomId() {
+ return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
+ }
+
+ private static boolean setBooleans(JobConf jobConf, EnumSet<Options> flags) {
+ boolean update = flags.contains(Options.UPDATE);
+ boolean overwrite = !update && flags.contains(Options.OVERWRITE);
+ jobConf.setBoolean(Options.UPDATE.propertyname, update);
+ jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
+ jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
+ flags.contains(Options.IGNORE_READ_FAILURES));
+ jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname,
+ flags.contains(Options.PRESERVE_STATUS));
+ return update || overwrite;
+ }
+
/**
* Initialize DFSCopyFileMapper specific job-configuration.
* @param conf : The dfs/mapred configuration.
@@ -633,46 +694,27 @@
* @param flags : Command-line flags
*/
private static void setup(Configuration conf, JobConf jobConf,
- List<Path> srcPaths, Path destPath,
- Path logPath, EnumSet<cpOpts> flags)
+ List<Path> srcPaths, final Path destPath,
+ Path logPath, EnumSet<Options> flags)
throws IOException {
- boolean update;
- boolean overwrite;
- jobConf.set("copy.dest.path", destPath.toUri().toString());
-
- // turn off speculative execution, because DFS doesn't handle
- // multiple writers to the same file.
- jobConf.setSpeculativeExecution(false);
-
- jobConf.setInputFormat(CopyInputFormat.class);
+ jobConf.set(DST_DIR_LABEL, destPath.toUri().toString());
+ final boolean updateORoverwrite = setBooleans(jobConf, flags);
- jobConf.setOutputKeyClass(Text.class);
- jobConf.setOutputValueClass(Text.class);
+ final String randomId = getRandomId();
+ Path jobDirectory = new Path(jobConf.getSystemDir(), NAME + "_" +
randomId);
+ jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
- jobConf.setMapperClass(FSCopyFilesMapper.class);
-
- jobConf.setNumReduceTasks(0);
- jobConf.setBoolean("distcp.ignore.read.failures",
- flags.contains(cpOpts.IGNORE_READ_FAILURES));
- jobConf.setBoolean("distcp.preserve.status.info",
- flags.contains(cpOpts.PRESERVE_STATUS));
- jobConf.setBoolean("distcp.overwrite.ifnewer",
- update = flags.contains(cpOpts.UPDATE));
- jobConf.setBoolean("distcp.overwrite.always",
- overwrite = !update && flags.contains(cpOpts.OVERWRITE));
-
- Random r = new Random();
- String randomId = Integer.toString(r.nextInt(Integer.MAX_VALUE), 36);
- Path jobDirectory = new Path(jobConf.getSystemDir(), "distcp_" + randomId);
- jobConf.set("distcp.job.dir", jobDirectory.toString());
- Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
- jobConf.set("distcp.src.list", srcfilelist.toString());
+ FileSystem dstfs = destPath.getFileSystem(conf);
+ boolean dstExists = dstfs.exists(destPath);
+ boolean dstIsDir = false;
+ if (dstExists) {
+ dstIsDir = dstfs.getFileStatus(destPath).isDir();
+ }
// default logPath
- FileSystem dstfs = destPath.getFileSystem(conf);
if (logPath == null) {
String filename = "_distcp_logs_" + randomId;
- if (!dstfs.exists(destPath) || !dstfs.getFileStatus(destPath).isDir()) {
+ if (!dstExists || !dstIsDir) {
Path parent = destPath.getParent();
dstfs.mkdirs(parent);
logPath = new Path(parent, filename);
@@ -681,70 +723,139 @@
}
}
jobConf.setOutputPath(logPath);
+
+ // create src list, dst list
+ FileSystem jobfs = jobDirectory.getFileSystem(jobConf);
+
+ Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
+ jobConf.set(SRC_LIST_LABEL, srcfilelist.toString());
+ SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs, jobConf,
+ srcfilelist, LongWritable.class, FilePair.class,
+ SequenceFile.CompressionType.NONE);
- // create src list
- SequenceFile.Writer writer = SequenceFile.createWriter(
- jobDirectory.getFileSystem(jobConf), jobConf, srcfilelist,
- LongWritable.class, FilePair.class,
+ Path dstfilelist = new Path(jobDirectory, "_distcp_dst_files");
+ SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs, jobConf,
+ dstfilelist, Text.class, Text.class,
SequenceFile.CompressionType.NONE);
- int cnfiles = 0;
- long cbsize = 0L;
+ // handle the case where the destination directory doesn't exist
+ // and we've only a single src directory OR we're updating/overwriting
+ // the contents of the destination directory.
+ final boolean special =
+ (srcPaths.size() == 1 && !dstExists) || updateORoverwrite;
+ int srcCount = 0, cnsyncf = 0;
+ long cbsize = 0L, cbsyncs = 0L;
try {
- // handle the case where the destination directory doesn't exist
- // and we've only a single src directory OR we're updating/overwriting
- // the contents of the destination directory.
- final boolean special_case =
- (srcPaths.size() == 1 && !dstfs.exists(destPath))
- || update || overwrite;
- int cnsyncf = 0;
- long cbsyncs = 0L;
for (Path p : srcPaths) {
- Path root = p.getParent();
FileSystem fs = p.getFileSystem(conf);
-
- if (special_case && fs.getFileStatus(p).isDir()) {
- root = p;
+ boolean pIsDir = fs.getFileStatus(p).isDir();
+ Path root = special && pIsDir? p: p.getParent();
+ if (pIsDir) {
+ ++srcCount;
}
Stack<Path> pathstack = new Stack<Path>();
pathstack.push(p);
while (!pathstack.empty()) {
for (FileStatus stat : fs.listStatus(pathstack.pop())) {
+ ++srcCount;
+
if (stat.isDir()) {
pathstack.push(stat.getPath());
- } else {
+ }
+ else {
++cnsyncf;
cbsyncs += stat.getLen();
- ++cnfiles;
cbsize += stat.getLen();
+
+ if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
+ src_writer.sync();
+ dst_writer.sync();
+ cnsyncf = 0;
+ cbsyncs = 0L;
+ }
}
- if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
- writer.sync();
- cnsyncf = 0;
- cbsyncs = 0L;
- }
- writer.append(new LongWritable(stat.isDir() ? 0 : stat.getLen()),
- new FilePair(stat, new Path(destPath,
- makeRelative(root, stat.getPath()))));
+
+ Path dst = makeRelative(root, stat.getPath());
+ src_writer.append(new LongWritable(stat.isDir()? 0: stat.getLen()),
+ new FilePair(stat, dst));
+ dst_writer.append(new Text(dst.toString()),
+ new Text(stat.getPath().toString()));
}
}
}
} finally {
- writer.close();
+ checkAndClose(src_writer);
+ checkAndClose(dst_writer);
}
// create dest path dir if copying > 1 file
- if (cnfiles > 1 && !dstfs.mkdirs(destPath)) {
- throw new IOException("Failed to create" + destPath);
- }
+ if (!dstfs.exists(destPath)) {
+ if (srcCount > 1 && !dstfs.mkdirs(destPath)) {
+ throw new IOException("Failed to create" + destPath);
+ }
+ }
+
+ checkDuplication(jobfs, dstfilelist,
+ new Path(jobDirectory, "_distcp_sorted"), conf);
+
+ Path tmpDir = new Path(
+ (dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
+ destPath.getParent(): destPath, "_distcp_tmp_" + randomId);
+ jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString());
+ LOG.info("srcCount=" + srcCount);
+ jobConf.setInt(SRC_COUNT_LABEL, srcCount);
+ jobConf.setLong(TOTAL_SIZE_LABEL, cbsize);
+ jobConf.setNumMapTasks(getMapCount(cbsize,
+ new JobClient(jobConf).getClusterStatus().getTaskTrackers()));
+ }
- jobConf.setInt("distcp.file.count", cnfiles);
- jobConf.setLong("distcp.total.size", cbsize);
+ static private void checkDuplication(FileSystem fs, Path file, Path sorted,
+ Configuration conf) throws IOException {
+ SequenceFile.Reader in = null;
+ try {
+ SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
+ new Text.Comparator(), Text.class, conf);
+ sorter.sort(file, sorted);
+ in = new SequenceFile.Reader(fs, sorted, conf);
+
+ Text prevdst = null, curdst = new Text();
+ Text prevsrc = null, cursrc = new Text();
+ for(; in.next(curdst, cursrc); ) {
+ if (prevdst != null && curdst.equals(prevdst)) {
+ throw new DuplicationException(
+ "Invalid input, there are duplicated files in the sources: "
+ + prevsrc + ", " + cursrc);
+ }
+ prevdst = curdst;
+ curdst = new Text();
+ prevsrc = cursrc;
+ cursrc = new Text();
+ }
+ }
+ finally {
+ checkAndClose(in);
+ }
+ }
- JobClient client = new JobClient(jobConf);
- jobConf.setNumMapTasks(getMapCount(cbsize,
- client.getClusterStatus().getTaskTrackers()));
+ static boolean checkAndClose(java.io.Closeable io) {
+ if (io != null) {
+ try {
+ io.close();
+ }
+ catch(IOException ioe) {
+ LOG.warn(StringUtils.stringifyException(ioe));
+ return false;
+ }
+ }
+ return true;
}
+ /** An exception class for duplicated source files. */
+ public static class DuplicationException extends IOException {
+ private static final long serialVersionUID = 1L;
+ /** Error code for this exception */
+ public static final int ERROR_CODE = -2;
+ DuplicationException(String message) {super(message);}
+ }
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java?rev=627143&r1=627142&r2=627143&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/fs/TestCopyFiles.java Tue Feb
12 14:01:01 2008
@@ -35,6 +35,8 @@
*/
public class TestCopyFiles extends TestCase {
+ static final URI LOCAL_FS = URI.create("file:///");
+
private static final int NFILES = 20;
private static String TEST_ROOT_DIR =
new Path(System.getProperty("test.build.data","/tmp"))
@@ -84,53 +86,33 @@
long getSeed() { return seed; }
}
- public TestCopyFiles(String testName) {
- super(testName);
- }
-
-
-
- @Override
- protected void setUp() throws Exception {
- }
-
- @Override
- protected void tearDown() throws Exception {
- }
-
/** create NFILES with random names and directory hierarchies
* with random (but reproducible) data in them.
*/
- private static MyFile[] createFiles(String fsname, String topdir)
+ private static MyFile[] createFiles(URI fsname, String topdir)
throws IOException {
- MyFile[] files = new MyFile[NFILES];
-
- for (int idx = 0; idx < NFILES; idx++) {
- files[idx] = new MyFile();
- }
-
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.getNamed(fsname, conf);
+ FileSystem fs = FileSystem.get(fsname, new Configuration());
Path root = new Path(topdir);
-
- for (int idx = 0; idx < NFILES; idx++) {
- Path fPath = new Path(root, files[idx].getName());
- if (!fs.mkdirs(fPath.getParent())) {
- throw new IOException("Mkdirs failed to create " +
- fPath.getParent().toString());
- }
- FSDataOutputStream out = fs.create(fPath);
- byte[] toWrite = new byte[files[idx].getSize()];
- Random rb = new Random(files[idx].getSeed());
- rb.nextBytes(toWrite);
- out.write(toWrite);
- out.close();
- toWrite = null;
+
+ MyFile[] files = new MyFile[NFILES];
+ for (int i = 0; i < NFILES; i++) {
+ files[i] = createFile(root, fs);
}
-
return files;
}
+ static MyFile createFile(Path root, FileSystem fs) throws IOException {
+ MyFile f = new MyFile();
+ Path p = new Path(root, f.getName());
+ FSDataOutputStream out = fs.create(p);
+ byte[] toWrite = new byte[f.getSize()];
+ new Random(f.getSeed()).nextBytes(toWrite);
+ out.write(toWrite);
+ out.close();
+ FileSystem.LOG.info("created: " + p + ", size=" + f.getSize());
+ return f;
+ }
+
/** check if the files have been copied correctly. */
private static boolean checkFiles(String fsname, String topdir, MyFile[]
files)
throws IOException {
@@ -139,7 +121,7 @@
FileSystem fs = FileSystem.getNamed(fsname, conf);
Path root = new Path(topdir);
- for (int idx = 0; idx < NFILES; idx++) {
+ for (int idx = 0; idx < files.length; idx++) {
Path fPath = new Path(root, files[idx].getName());
FSDataInputStream in = fs.open(fPath);
byte[] toRead = new byte[files[idx].getSize()];
@@ -231,7 +213,7 @@
/** copy files from local file system to local file system */
public void testCopyFromLocalToLocal() throws Exception {
- MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
+ MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
ToolRunner.run(new CopyFiles(new Configuration()),
new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
"file:///"+TEST_ROOT_DIR+"/destdat"});
@@ -250,7 +232,7 @@
cluster = new MiniDFSCluster(conf, 2, true, null);
namenode = conf.get("fs.default.name", "local");
if (!"local".equals(namenode)) {
- MyFile[] files = createFiles(namenode, "/srcdat");
+ MyFile[] files = createFiles(URI.create("hdfs://"+namenode),
"/srcdat");
ToolRunner.run(new CopyFiles(conf), new String[] {
"-log",
"hdfs://"+namenode+"/logs",
@@ -279,7 +261,7 @@
cluster = new MiniDFSCluster(conf, 1, true, null);
namenode = conf.get("fs.default.name", "local");
if (!"local".equals(namenode)) {
- MyFile[] files = createFiles("local", TEST_ROOT_DIR+"/srcdat");
+ MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
ToolRunner.run(new CopyFiles(conf), new String[] {
"-log",
"hdfs://"+namenode+"/logs",
@@ -308,7 +290,7 @@
cluster = new MiniDFSCluster(conf, 1, true, null);
namenode = conf.get("fs.default.name", "local");
if (!"local".equals(namenode)) {
- MyFile[] files = createFiles(namenode, "/srcdat");
+ MyFile[] files = createFiles(URI.create("hdfs://"+namenode),
"/srcdat");
ToolRunner.run(new CopyFiles(conf), new String[] {
"-log",
"/logs",
@@ -336,7 +318,7 @@
cluster = new MiniDFSCluster(conf, 2, true, null);
namenode = conf.get("fs.default.name", "local");
if (!"local".equals(namenode)) {
- MyFile[] files = createFiles(namenode, "/srcdat");
+ MyFile[] files = createFiles(URI.create("hdfs://"+namenode),
"/srcdat");
ToolRunner.run(new CopyFiles(conf), new String[] {
"-p",
"-log",
@@ -388,4 +370,54 @@
}
}
+ public void testCopyDuplication() throws Exception {
+ try {
+ MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
+ ToolRunner.run(new CopyFiles(new Configuration()),
+ new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
+ "file:///"+TEST_ROOT_DIR+"/src2/srcdat"});
+ assertTrue("Source and destination directories do not match.",
+ checkFiles("local", TEST_ROOT_DIR+"/src2/srcdat", files));
+
+ assertEquals(CopyFiles.DuplicationException.ERROR_CODE,
+ ToolRunner.run(new CopyFiles(new Configuration()),
+ new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
+ "file:///"+TEST_ROOT_DIR+"/src2/srcdat",
+ "file:///"+TEST_ROOT_DIR+"/destdat",}));
+ }
+ finally {
+ deldir("local", TEST_ROOT_DIR+"/destdat");
+ deldir("local", TEST_ROOT_DIR+"/srcdat");
+ deldir("local", TEST_ROOT_DIR+"/src2");
+ }
+ }
+
+ public void testCopySingleFile() throws Exception {
+ FileSystem fs = FileSystem.get(LOCAL_FS, new Configuration());
+ Path root = new Path(TEST_ROOT_DIR+"/srcdat");
+ try {
+ MyFile[] files = {createFile(root, fs)};
+ //copy a dir with a single file
+ ToolRunner.run(new CopyFiles(new Configuration()),
+ new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
+ "file:///"+TEST_ROOT_DIR+"/destdat"});
+ assertTrue("Source and destination directories do not match.",
+ checkFiles("local", TEST_ROOT_DIR+"/destdat", files));
+
+ //copy a single file
+ String fname = files[0].getName();
+ Path p = new Path(root, fname);
+ FileSystem.LOG.info("fname=" + fname + ", exists? " + fs.exists(p));
+ ToolRunner.run(new CopyFiles(new Configuration()),
+ new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat/"+fname,
+ "file:///"+TEST_ROOT_DIR+"/dest2/"+fname});
+ assertTrue("Source and destination directories do not match.",
+ checkFiles("local", TEST_ROOT_DIR+"/dest2", files));
+ }
+ finally {
+ deldir("local", TEST_ROOT_DIR+"/destdat");
+ deldir("local", TEST_ROOT_DIR+"/dest2");
+ deldir("local", TEST_ROOT_DIR+"/srcdat");
+ }
+ }
}