Author: dhruba Date: Tue Jul 31 16:57:48 2007 New Revision: 561603 URL: http://svn.apache.org/viewvc?view=rev&rev=561603 Log: HADOOP-999. A HDFS Client immediately informs the NameNode of a new file creation. ClientProtocol version changed from 14 to 15. (Tsz Wo (Nicholas), SZE via dhruba)
Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=561603&r1=561602&r2=561603 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Tue Jul 31 16:57:48 2007 @@ -5,6 +5,10 @@ INCOMPATIBLE CHANGES + HADOOP-999. A HDFS Client immediately informs the NameNode of a new + file creation. ClientProtocol version changed from 14 to 15. + (Tsz Wo (Nicholas), SZE via dhruba) + NEW FEATURES HADOOP-1636. Allow configuration of the number of jobs kept in Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=561603&r1=561602&r2=561603 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Tue Jul 31 16:57:48 2007 @@ -31,9 +31,9 @@ /** * Compared to the previous version the following changes have been introduced: - * 14: distributedUpgradeProgress() added. + * 15: create(...) should only create a file but not return block. */ - public static final long versionID = 14L; + public static final long versionID = 15L; /////////////////////////////////////// // File contents @@ -90,7 +90,7 @@ * create multi-block files must also use reportWrittenBlock() * and addBlock(). */ - public LocatedBlock create(String src, + public void create(String src, String clientName, boolean overwrite, short replication, Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=561603&r1=561602&r2=561603 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Jul 31 16:57:48 2007 @@ -1396,9 +1396,7 @@ boolean closed = false; private UTF8 src; - private boolean overwrite; private short replication; - private boolean firstTime = true; private DataOutputStream blockStream; private DataInputStream blockReplyStream; private File backupFile; @@ -1421,7 +1419,6 @@ ) throws IOException { super(new CRC32(), conf.getInt("io.bytes.per.checksum", 512), 4); this.src = src; - this.overwrite = overwrite; this.replication = replication; this.blockSize = blockSize; this.buffersize = buffersize; @@ -1441,6 +1438,8 @@ checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, bytesPerChecksum); + namenode.create( + src.toString(), clientName, overwrite, replication, blockSize); } private void openBackupStream() throws IOException { @@ -1494,13 +1493,7 @@ do { retry = false; - LocatedBlock lb; - if (firstTime) { - lb = locateNewBlock(); - } else { - lb = locateFollowingBlock(startTime); - } - + LocatedBlock lb = locateFollowingBlock(startTime); block = lb.getBlock(); if (block.getNumBytes() < bytesWrittenToBlock) { block.setNumBytes(bytesWrittenToBlock); @@ -1524,12 +1517,7 @@ Thread.sleep(6000); } catch (InterruptedException iex) { } - if (firstTime) { - namenode.abandonFileInProgress(src.toString(), - clientName); - } else { - namenode.abandonBlock(block, src.toString()); - } + namenode.abandonBlock(block, src.toString()); retry = true; continue; } @@ -1549,14 +1537,8 @@ blockStream = out; blockReplyStream = new DataInputStream(s.getInputStream()); } while (retry); - firstTime = false; } - private LocatedBlock locateNewBlock() throws IOException { - return namenode.create(src.toString(), clientName, - overwrite, replication, blockSize); - } - private LocatedBlock locateFollowingBlock(long start ) throws IOException { int retries = 5; Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=561603&r1=561602&r2=561603 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue Jul 31 16:57:48 2007 @@ -709,54 +709,7 @@ * @throws IOException if the filename is invalid * [EMAIL PROTECTED] FSDirectory#isValidToCreate(String)}. */ - public LocatedBlock startFile(String src, - String holder, - String clientMachine, - boolean overwrite, - short replication, - long blockSize - ) throws IOException { - - // - // Create file into pendingCreates and get the first blockId - // - Block newBlock = startFileInternal(src, holder, clientMachine, - overwrite, replication, - blockSize); - - // - // Get the array of replication targets - // - try { - DatanodeDescriptor clientNode = - host2DataNodeMap.getDatanodeByHost(clientMachine); - DatanodeDescriptor targets[] = replicator.chooseTarget(replication, - clientNode, null, blockSize); - if (targets.length < this.minReplication) { - if (clusterMap.getNumOfLeaves() == 0) { - throw new IOException("Failed to create file " + src - + " on client " + clientMachine - + " because this cluster has no datanodes."); - } - throw new IOException("Failed to create file " + src - + " on client " + clientMachine - + " because there were not enough datanodes available. " - + "Found " + targets.length - + " datanodes but MIN_REPLICATION for the cluster is " - + "configured to be " - + this.minReplication - + "."); - } - return new LocatedBlock(newBlock, targets, 0L); - - } catch (IOException ie) { - NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " - + ie.getMessage()); - throw ie; - } - } - - public synchronized Block startFileInternal(String src, + synchronized void startFile(String src, String holder, String clientMachine, boolean overwrite, @@ -861,9 +814,6 @@ } lease.startedCreate(src); } - - // Create first block - return allocateBlock(src); } catch (IOException ie) { NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: " +ie.getMessage()); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=561603&r1=561602&r2=561603 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Tue Jul 31 16:57:48 2007 @@ -291,7 +291,7 @@ /** */ - public LocatedBlock create(String src, + public void create(String src, String clientName, boolean overwrite, short replication, @@ -304,14 +304,9 @@ throw new IOException("create: Pathname too long. Limit " + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels."); } - LocatedBlock result = namesystem.startFile(src, - clientName, - clientMachine, - overwrite, - replication, - blockSize); + namesystem.startFile( + src, clientName, clientMachine, overwrite, replication, blockSize); myMetrics.createFile(); - return result; } public boolean setReplication(String src, Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=561603&r1=561602&r2=561603 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Tue Jul 31 16:57:48 2007 @@ -249,7 +249,15 @@ * Files are overwritten by default. */ public FSDataOutputStream create(Path f) throws IOException { - return create(f, true, + return create(f, true); + } + + /** + * Opens an FSDataOutputStream at the indicated Path. + */ + public FSDataOutputStream create(Path f, boolean overwrite) + throws IOException { + return create(f, overwrite, getConf().getInt("io.file.buffer.size", 4096), getDefaultReplication(), getDefaultBlockSize()); @@ -773,7 +781,19 @@ */ public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException { - FileUtil.copy(getLocal(getConf()), src, this, dst, delSrc, getConf()); + copyFromLocalFile(delSrc, true, src, dst); + } + + /** + * The src file is on the local disk. Add it to FS at + * the given dst name. + * delSrc indicates if the source should be removed + */ + public void copyFromLocalFile(boolean delSrc, boolean overwrite, + Path src, Path dst) + throws IOException { + Configuration conf = getConf(); + FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf); } /** Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=561603&r1=561602&r2=561603 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Tue Jul 31 16:57:48 2007 @@ -114,6 +114,15 @@ FileSystem dstFS, Path dst, boolean deleteSource, Configuration conf) throws IOException { + return copy(srcFS, src, dstFS, dst, deleteSource, true, conf); + } + + /** Copy files between FileSystems. */ + public static boolean copy(FileSystem srcFS, Path src, + FileSystem dstFS, Path dst, + boolean deleteSource, + boolean overwrite, + Configuration conf) throws IOException { dst = checkDest(src.getName(), dstFS, dst); if (srcFS.isDirectory(src)) { @@ -124,12 +133,12 @@ Path contents[] = srcFS.listPaths(src); for (int i = 0; i < contents.length; i++) { copy(srcFS, contents[i], dstFS, new Path(dst, contents[i].getName()), - deleteSource, conf); + deleteSource, overwrite, conf); } } else if (srcFS.isFile(src)) { InputStream in = srcFS.open(src); try { - OutputStream out = dstFS.create(dst); + OutputStream out = dstFS.create(dst, overwrite); copyContent(in, out, conf); } finally { in.close(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java?view=diff&rev=561603&r1=561602&r2=561603 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FsShell.java Tue Jul 31 16:57:48 2007 @@ -109,7 +109,7 @@ if (src.toString().equals("-")) { copyFromStdin(new Path(dstf)); } else { - fs.copyFromLocalFile(src, new Path(dstf)); + fs.copyFromLocalFile(false, false, src, new Path(dstf)); } } Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java?view=diff&rev=561603&r1=561602&r2=561603 ============================================================================== --- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java (original) +++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDFSShell.java Tue Jul 31 16:57:48 2007 @@ -19,13 +19,13 @@ import junit.framework.TestCase; import java.io.*; +import java.security.*; +import java.util.*; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FsShell; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.apache.hadoop.util.StringUtils; - /** * This class tests commands from DFSShell. */ @@ -33,11 +33,141 @@ private static String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp")) .toString().replace(' ', '+'); + + static private Path writeFile(FileSystem fs, Path f) throws IOException { + DataOutputStream out = fs.create(f); + out.writeBytes("dhruba: " + f); + out.close(); + assertTrue(fs.exists(f)); + return f; + } + + static private Path mkdir(FileSystem fs, Path p) throws IOException { + assertTrue(fs.mkdirs(p)); + assertTrue(fs.exists(p)); + assertTrue(fs.getFileStatus(p).isDir()); + return p; + } + + static private File createLocalFile(File f) throws IOException { + assertTrue(!f.exists()); + PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(f))); + out.println(f.getAbsolutePath()); + out.close(); + assertTrue(f.exists()); + assertTrue(f.isFile()); + return f; + } + + static void show(String s) { + System.out.println(Thread.currentThread().getStackTrace()[2] + " " + s); + } + + public void testZeroSizeFile() throws IOException { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null); + FileSystem fs = cluster.getFileSystem(); + assertTrue("Not a HDFS: "+fs.getUri(), + fs instanceof DistributedFileSystem); + final DistributedFileSystem dfs = (DistributedFileSystem)fs; + + try { + //create a zero size file + final File f1 = new File(TEST_ROOT_DIR, "f1"); + assertTrue(!f1.exists()); + assertTrue(f1.createNewFile()); + assertTrue(f1.exists()); + assertTrue(f1.isFile()); + assertEquals(0L, f1.length()); + + //copy to remote + final Path root = mkdir(dfs, new Path("/test/zeroSizeFile")); + final Path remotef = new Path(root, "dst"); + show("copy local " + f1 + " to remote " + remotef); + dfs.copyFromLocalFile(false, false, new Path(f1.getPath()), remotef); + + //getBlockSize() should not throw exception + show("Block size = " + dfs.getFileStatus(remotef).getBlockSize()); + + //copy back + final File f2 = new File(TEST_ROOT_DIR, "f2"); + assertTrue(!f2.exists()); + dfs.copyToLocalFile(remotef, new Path(f2.getPath())); + assertTrue(f2.exists()); + assertTrue(f2.isFile()); + assertEquals(0L, f2.length()); - private void writeFile(FileSystem fileSys, Path name) throws IOException { - DataOutputStream stm = fileSys.create(name); - stm.writeBytes("dhruba: " + name); - stm.close(); + f1.delete(); + f2.delete(); + } finally { + try {dfs.close();} catch (Exception e) {} + cluster.shutdown(); + } + } + + public void testPut() throws IOException { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null); + FileSystem fs = cluster.getFileSystem(); + assertTrue("Not a HDFS: "+fs.getUri(), + fs instanceof DistributedFileSystem); + final DistributedFileSystem dfs = (DistributedFileSystem)fs; + + try { + final File f1 = createLocalFile(new File(TEST_ROOT_DIR, "f1")); + final File f2 = createLocalFile(new File(TEST_ROOT_DIR, "f2")); + + final Path root = mkdir(dfs, new Path("/test/put")); + final Path dst = new Path(root, "dst"); + + show("begin"); + + final Thread copy2ndFileThread = new Thread() { + public void run() { + try { + show("copy local " + f2 + " to remote " + dst); + dfs.copyFromLocalFile(false, false, new Path(f2.getPath()), dst); + } catch (IOException ioe) { + show("good " + StringUtils.stringifyException(ioe)); + return; + } + //should not be here, must got IOException + assertTrue(false); + } + }; + + //use SecurityManager to pause the copying of f1 and begin copying f2 + System.setSecurityManager(new SecurityManager() { + private boolean firstTime = true; + + public void checkPermission(Permission perm) { + if (firstTime) { + Thread t = Thread.currentThread(); + if (!t.toString().contains("DataNode")) { + String s = "" + Arrays.asList(t.getStackTrace()); + if (s.contains("FileUtil.copyContent")) { + //pause at FileUtil.copyContent + + firstTime = false; + copy2ndFileThread.start(); + try {Thread.sleep(5000);} catch (InterruptedException e) {} + } + } + } + } + }); + show("copy local " + f1 + " to remote " + dst); + dfs.copyFromLocalFile(false, false, new Path(f1.getPath()), dst); + show("done"); + + try {copy2ndFileThread.join();} catch (InterruptedException e) { } + System.setSecurityManager(null); + f1.delete(); + f2.delete(); + } finally { + try {dfs.close();} catch (Exception e) {} + cluster.shutdown(); + } } public void testCopyToLocal() throws IOException { @@ -63,33 +193,14 @@ // + sub // |- f3 // |- f4 - Path root = new Path("/test/copyToLocal"); - assertTrue(dfs.mkdirs(root)); - assertTrue(dfs.exists(root)); - assertTrue(dfs.isDirectory(root)); - - Path sub = new Path(root, "sub"); - assertTrue(dfs.mkdirs(sub)); - assertTrue(dfs.exists(sub)); - assertTrue(dfs.isDirectory(sub)); - - Path f1 = new Path(root, "f1"); - writeFile(dfs, f1); - assertTrue(dfs.exists(f1)); - - Path f2 = new Path(root, "f2"); - writeFile(dfs, f2); - assertTrue(dfs.exists(f2)); - - Path f3 = new Path(sub, "f3"); - writeFile(dfs, f3); - assertTrue(dfs.exists(f3)); - - Path f4 = new Path(sub, "f4"); - writeFile(dfs, f4); - assertTrue(dfs.exists(f4)); - } + Path root = mkdir(dfs, new Path("/test/copyToLocal")); + Path sub = mkdir(dfs, new Path(root, "sub")); + writeFile(fs, new Path(root, "f1")); + writeFile(fs, new Path(root, "f2")); + writeFile(fs, new Path(sub, "f3")); + writeFile(fs, new Path(sub, "f4")); + } // Verify copying the tree { @@ -111,10 +222,10 @@ assertTrue("Copying failed.", sub.isDirectory()); File f3 = new File(sub, "f3"); - assertTrue("Copying failed.", f3.exists()); + assertTrue("Copying failed.", f3.isFile()); File f4 = new File(sub, "f4"); - assertTrue("Copying failed.", f4.exists()); + assertTrue("Copying failed.", f4.isFile()); f1.delete(); f2.delete();