Author: cutting Date: Fri Jun 22 15:47:47 2007 New Revision: 549975 URL: http://svn.apache.org/viewvc?view=rev&rev=549975 Log: HADOOP-1292. Change 'bin/hadoop fs -get' to first copy files to a temporary name, then rename them to their final name, so that failures don't leave partial files. Contributed by Tsz Wo Sze.
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=549975&r1=549974&r2=549975 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Fri Jun 22 15:47:47 2007 @@ -242,6 +242,10 @@ 74. HADOOP-1518. Add a session id to job metrics, for use by HOD. (David Bowen via cutting) + 75. HADOOP-1292. Change 'bin/hadoop fs -get' to first copy files to + a temporary name, then rename them to their final name, so that + failures don't leave partial files. (Tsz Wo Sze via cutting) + Release 0.13.0 - 2007-06-08 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=549975&r1=549974&r2=549975 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Fri Jun 22 15:47:47 2007 @@ -65,7 +65,7 @@ int port = uri.getPort(); this.dfs = new DFSClient(new InetSocketAddress(host, port), conf); this.uri = URI.create("hdfs://"+host+":"+port); - this.localFs = getNamed("file:///", conf); + this.localFs = getLocal(conf); } public Path getWorkingDirectory() { Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java?view=diff&rev=549975&r1=549974&r2=549975 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java Fri Jun 22 15:47:47 2007 @@ -567,8 +567,8 @@ @Override public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException { - FileSystem localFs = getNamed("file:///", getConf()); - FileUtil.copy(localFs, src, this, dst, delSrc, getConf()); + Configuration conf = getConf(); + FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf); } /** @@ -578,8 +578,8 @@ @Override public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException { - FileSystem localFs = getNamed("file:///", getConf()); - FileUtil.copy(this, src, localFs, dst, delSrc, getConf()); + Configuration conf = getConf(); + FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf); } /** @@ -592,7 +592,7 @@ throws IOException { if (!fs.isDirectory(src)) { // source is a file fs.copyToLocalFile(src, dst); - FileSystem localFs = getNamed("file:///", getConf()); + FileSystem localFs = getLocal(getConf()); if (localFs instanceof ChecksumFileSystem) { localFs = ((ChecksumFileSystem) localFs).getRawFileSystem(); } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=549975&r1=549974&r2=549975 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Fri Jun 22 15:47:47 2007 @@ -194,9 +194,6 @@ public static boolean copy(FileSystem srcFS, Path src, File dst, boolean deleteSource, Configuration conf) throws IOException { - - dst = checkDest(src.getName(), dst); - if (srcFS.isDirectory(src)) { if (!dst.mkdirs()) { return false; @@ -431,5 +428,27 @@ String cmd = "chmod " + perm + " " + filename; Process p = Runtime.getRuntime().exec(cmd, null); return p.waitFor(); + } + + /** + * Create a tmp file for a base file. + * @param basefile the base file of the tmp + * @param prefix file name prefix of tmp + * @param isDeleteOnExit if true, the tmp will be deleted when the VM exits + * @return a newly created tmp file + * @exception IOException If a tmp file cannot created + * @see java.io.File#createTempFile(String, String, File) + * @see java.io.File#deleteOnExit() + */ + public static final File createLocalTempFile(final File basefile, + final String prefix, + final boolean isDeleteOnExit) + throws IOException { + File tmp = File.createTempFile(prefix + basefile.getName(), + "", basefile.getParentFile()); + if (isDeleteOnExit) { + tmp.deleteOnExit(); + } + return tmp; } } Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java?view=diff&rev=549975&r1=549974&r2=549975 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java Fri Jun 22 15:47:47 2007 @@ -21,7 +21,6 @@ import java.util.*; import org.apache.hadoop.conf.*; -import org.apache.hadoop.dfs.DistributedFileSystem; import org.apache.hadoop.ipc.*; import org.apache.hadoop.util.ToolBase; @@ -138,18 +137,86 @@ } cat(srcf); } else { - Path [] srcs = fs.globPaths(new Path(srcf)); - if (srcs.length > 1 && !new File(dstf).isDirectory()) { - throw new IOException("When copying multiple files, " - + "destination should be a directory."); + copyToLocal(fs, new Path(srcf), new File(dstf), copyCrc); + } + } + + /** + * The prefix for the tmp file used in copyToLocal. + * It must be at least three characters long, required by + * [EMAIL PROTECTED] java.io.File#createTempFile(String, String, File)}. + */ + static final String COPYTOLOCAL_PREFIX = "_copyToLocal_"; + + /** + * Copy a source file from a given file system to local destination. + * @param srcFS source file system + * @param src source path + * @param dst destination + * @param copyCrc copy CRC files? + * @exception IOException If some IO failed + */ + private void copyToLocal(final FileSystem srcFS, final Path src, + final File dst, final boolean copyCrc) + throws IOException { + if (srcFS.isDirectory(src)) { //src is a directory + dst.mkdir(); + if (!dst.isDirectory()) { + throw new IOException("cannot create directory for local destination \"" + + dst + "\"."); + } + for(Path p : srcFS.listPaths(src)) { + copyToLocal(srcFS, p, + srcFS.isDirectory(p)? new File(dst, p.getName()): dst, copyCrc); } - Path dst = new Path(dstf); - for(int i=0; i<srcs.length; i++) { - ((DistributedFileSystem)fs).copyToLocalFile(srcs[i], dst, copyCrc); + } + else { + Path [] srcs = srcFS.globPaths(src); + if (dst.isDirectory()) { //dst is a directory but src is not + for (Path p : srcs) { + copyToLocal(srcFS, p, new File(dst, p.getName()), copyCrc); + } + } else if (srcs.length == 1) + { + if (dst.exists()) { + throw new IOException("local destination \"" + dst + + "\" already exists."); + } + if (!srcFS.exists(src)) { + throw new IOException("src \"" + src + "\" does not exist."); + } + + File tmp = FileUtil.createLocalTempFile(dst, COPYTOLOCAL_PREFIX, true); + if (FileUtil.copy(srcFS, src, tmp, false, srcFS.getConf())) { + if (!tmp.renameTo(dst)) { + //try to reanme tmp to another file since tmp will be deleted on exit + File another = FileUtil.createLocalTempFile(dst, COPYTOLOCAL_PREFIX, + false); + another.delete(); + if (tmp.renameTo(another)) { + throw new IOException( + "Failed to rename tmp file to local destination \"" + dst + + "\". Remote source file \"" + src + "\" is saved to \"" + + another + "\"."); + } else { + throw new IOException("Failed to rename tmp file."); + } + } + } + + if (copyCrc) { + ChecksumFileSystem csfs = (ChecksumFileSystem) srcFS; + File dstcs = FileSystem.getLocal(srcFS.getConf()) + .pathToFile(csfs.getChecksumFile(new Path(dst.getCanonicalPath()))); + copyToLocal(srcFS, csfs.getChecksumFile(src), dstcs, false); + } + } else { + throw new IOException("When copying multiple files, " + + "destination should be a directory."); } } } - + /** * Get all the files in the directories that match the source file * pattern and merge and sort them to only one file on local fs Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java?view=diff&rev=549975&r1=549974&r2=549975 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java Fri Jun 22 15:47:47 2007 @@ -36,10 +36,97 @@ private void writeFile(FileSystem fileSys, Path name) throws IOException { DataOutputStream stm = fileSys.create(name); - stm.writeBytes("dhruba"); + stm.writeBytes("dhruba: " + name); stm.close(); } - + + public void testCopyToLocal() throws IOException { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null); + FileSystem fs = cluster.getFileSystem(); + assertTrue("Not a HDFS: "+fs.getUri(), + fs instanceof DistributedFileSystem); + DistributedFileSystem dfs = (DistributedFileSystem)fs; + FsShell shell = new FsShell(); + shell.setConf(conf); + + try { + { + // create a tree + // ROOT + // |- f1 + // |- f2 + // + sub + // |- f3 + // |- f4 + Path root = new Path("/test/copyToLocal"); + assertTrue(dfs.mkdirs(root)); + assertTrue(dfs.exists(root)); + assertTrue(dfs.isDirectory(root)); + + Path sub = new Path(root, "sub"); + assertTrue(dfs.mkdirs(sub)); + assertTrue(dfs.exists(sub)); + assertTrue(dfs.isDirectory(sub)); + + Path f1 = new Path(root, "f1"); + writeFile(dfs, f1); + assertTrue(dfs.exists(f1)); + + Path f2 = new Path(root, "f2"); + writeFile(dfs, f2); + assertTrue(dfs.exists(f2)); + + Path f3 = new Path(sub, "f3"); + writeFile(dfs, f3); + assertTrue(dfs.exists(f3)); + + Path f4 = new Path(sub, "f4"); + writeFile(dfs, f4); + assertTrue(dfs.exists(f4)); + } + + + // Verify copying the tree + { + String[] args = {"-copyToLocal", "/test/copyToLocal", TEST_ROOT_DIR}; + try { + assertEquals(0, shell.run(args)); + } catch (Exception e) { + System.err.println("Exception raised from DFSShell.run " + + e.getLocalizedMessage()); + } + + File f1 = new File(TEST_ROOT_DIR, "f1"); + assertTrue("Copying failed.", f1.isFile()); + + File f2 = new File(TEST_ROOT_DIR, "f2"); + assertTrue("Copying failed.", f2.isFile()); + + File sub = new File(TEST_ROOT_DIR, "sub"); + assertTrue("Copying failed.", sub.isDirectory()); + + File f3 = new File(sub, "f3"); + assertTrue("Copying failed.", f3.exists()); + + File f4 = new File(sub, "f4"); + assertTrue("Copying failed.", f4.exists()); + + f1.delete(); + f2.delete(); + f3.delete(); + f4.delete(); + sub.delete(); + } + } finally { + try { + dfs.close(); + } catch (Exception e) { + } + cluster.shutdown(); + } + } + /** * Tests various options of DFSShell. */