Author: dhruba
Date: Wed Feb 13 21:35:14 2008
New Revision: 627663
URL: http://svn.apache.org/viewvc?rev=627663&view=rev
Log:
HADOOP-2345. New HDFS transactions to support appending
to files. Disk layout version changed from -11 to -12. (dhruba)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Feb 13 21:35:14 2008
@@ -7,6 +7,9 @@
HADOOP-2786. Move hbase out of hadoop core
+ HADOOP-2345. New HDFS transactions to support appending
+ to files. Disk layout version changed from -11 to -12. (dhruba)
+
NEW FEATURES
HADOOP-1398. Add HBase in-memory block cache. (tomwhite)
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Wed
Feb 13 21:35:14 2008
@@ -35,9 +35,9 @@
* Compared to the previous version the following changes have been
introduced:
* (Only the latest change is reflected.
* The log of historical changes can be retrieved from the svn).
- * 23 : added setOwner(...) and setPermission(...); changed create(...) and
mkdir(...)
+ * 24 : added fsync
*/
- public static final long versionID = 23L;
+ public static final long versionID = 24L;
///////////////////////////////////////
// File contents
@@ -437,4 +437,12 @@
* @return size of directory subtree in bytes
*/
public long getContentLength(String src) throws IOException;
+
+ /**
+ * Write all metadata for this file into persistent storage.
+ * The file must be currently open for writing.
+ * @param src The string representation of the path
+ * @param clientName The string representation of the client
+ */
+ public void fsync(String src, String client) throws IOException;
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Wed Feb
13 21:35:14 2008
@@ -183,7 +183,7 @@
// Version is reflected in the data storage file.
// Versions are negative.
// Decrement LAYOUT_VERSION to define a new version.
- public static final int LAYOUT_VERSION = -11;
+ public static final int LAYOUT_VERSION = -12;
// Current version:
- // Added permission information to INode.
+ // Introduce OPEN, CLOSE and GENSTAMP transactions for supporting appends
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Wed Feb
13 21:35:14 2008
@@ -124,7 +124,8 @@
long preferredBlockSize,
String clientName,
String clientMachine,
- DatanodeDescriptor clientNode)
+ DatanodeDescriptor clientNode,
+ long generationStamp)
throws IOException {
waitForReady();
@@ -134,7 +135,8 @@
modTime)) {
return null;
}
- INodeFile newNode = new INodeFileUnderConstruction(permissions,replication,
+ INodeFileUnderConstruction newNode = new INodeFileUnderConstruction(
+ permissions,replication,
preferredBlockSize, modTime, clientName,
clientMachine, clientNode);
synchronized (rootDir) {
@@ -153,8 +155,10 @@
+" to the file system");
return null;
}
- // add create file record to log
- fsImage.getEditLog().logCreateFile(path, newNode);
+ // add create file record to log, record new generation stamp
+ fsImage.getEditLog().logOpenFile(path, newNode);
+ fsImage.getEditLog().logGenerationStamp(generationStamp);
+
NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+path+" is added to the file system");
return newNode;
@@ -171,9 +175,9 @@
INode newNode;
if (blocks == null)
newNode = new INodeDirectory(permissions, modificationTime);
- else
+ else
newNode = new INodeFile(permissions, blocks.length, replication,
- modificationTime, preferredBlockSize);
+ modificationTime, preferredBlockSize);
synchronized (rootDir) {
try {
newNode = rootDir.addNode(path, newNode);
@@ -220,20 +224,30 @@
/**
* Persist the block list for the inode.
*/
- void persistBlocks(String path, INode file) throws IOException {
+ void persistBlocks(String path, INodeFileUnderConstruction file)
+ throws IOException {
waitForReady();
synchronized (rootDir) {
- INodeFile fileNode = (INodeFile) file;
+ fsImage.getEditLog().logOpenFile(path, file);
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
+ +path+" with "+ file.getBlocks().length
+ +" blocks is persisted to the file
system");
+ }
+ }
- // create two transactions. The first one deletes the empty
- // file and the second transaction recreates the same file
- // with the appropriate set of blocks.
- fsImage.getEditLog().logDelete(path, fileNode.getModificationTime());
+ /**
+ * Close file.
+ */
+ void closeFile(String path, INode file) throws IOException {
+ waitForReady();
- // re-add create file record to log
- fsImage.getEditLog().logCreateFile(path, fileNode);
- NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+ synchronized (rootDir) {
+ INodeFile fileNode = (INodeFile) file;
+
+ // file is closed
+ fsImage.getEditLog().logCloseFile(path, fileNode);
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
+path+" with "+
fileNode.getBlocks().length
+" blocks is persisted to the file
system");
}
@@ -242,26 +256,20 @@
/**
* Remove a block to the file.
*/
- boolean removeBlock(String path, INode file, Block block) throws IOException
{
+ boolean removeBlock(String path, INodeFileUnderConstruction fileNode,
+ Block block) throws IOException {
waitForReady();
synchronized (rootDir) {
- INodeFile fileNode = (INodeFile) file;
- if (fileNode == null) {
- throw new IOException("Unknown file: " + path);
- }
-
// modify file-> block and blocksMap
fileNode.removeBlock(block);
namesystem.blocksMap.removeINode(block);
- // create two transactions. The first one deletes the empty
- // file and the second transaction recreates the same file
- // with the appropriate set of blocks.
- fsImage.getEditLog().logDelete(path, fileNode.getModificationTime());
+ // Remove the block locations for the last block.
+ fileNode.setLastBlockLocations(new DatanodeDescriptor[0]);
- // re-add create file record to log
- fsImage.getEditLog().logCreateFile(path, fileNode);
+ // write modified block locations to log
+ fsImage.getEditLog().logOpenFile(path, fileNode);
NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
+path+" with "+block
+" block is added to the file system");
@@ -433,20 +441,22 @@
/**
* Remove the file from management, return blocks
*/
- public Block[] delete(String src) {
+ public INode delete(String src, Collection<Block> deletedBlocks) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: "
+src);
waitForReady();
long now = FSNamesystem.now();
- Block[] blocks = unprotectedDelete(src, now);
- if (blocks != null)
+ INode deletedNode = unprotectedDelete(src, now, deletedBlocks);
+ if (deletedNode != null) {
fsImage.getEditLog().logDelete(src, now);
- return blocks;
+ }
+ return deletedNode;
}
/**
*/
- Block[] unprotectedDelete(String src, long modificationTime) {
+ INode unprotectedDelete(String src, long modificationTime,
+ Collection<Block> deletedBlocks) {
synchronized (rootDir) {
INode targetNode = rootDir.getNode(src);
if (targetNode == null) {
@@ -472,8 +482,11 @@
totalInodes -= filesRemoved;
for (Block b : v) {
namesystem.blocksMap.removeINode(b);
+ if (deletedBlocks != null) {
+ deletedBlocks.add(b);
+ }
}
- return v.toArray(new Block[v.size()]);
+ return targetNode;
}
}
}
@@ -567,7 +580,6 @@
* Get [EMAIL PROTECTED] INode} associated with the file.
*/
INodeFile getFileINode(String src) {
- waitForReady();
synchronized (rootDir) {
INode inode = rootDir.getNode(src);
if (inode == null || inode.isDirectory())
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Wed Feb 13
21:35:14 2008
@@ -39,15 +39,17 @@
*/
class FSEditLog {
private static final byte OP_ADD = 0;
- private static final byte OP_RENAME = 1;
- private static final byte OP_DELETE = 2;
- private static final byte OP_MKDIR = 3;
- private static final byte OP_SET_REPLICATION = 4;
- //the following two are used only for backword compatibility :
+ private static final byte OP_RENAME = 1; // rename
+ private static final byte OP_DELETE = 2; // delete
+ private static final byte OP_MKDIR = 3; // create directory
+ private static final byte OP_SET_REPLICATION = 4; // set replication
+ //the following two are used only for backward compatibility :
@Deprecated private static final byte OP_DATANODE_ADD = 5;
@Deprecated private static final byte OP_DATANODE_REMOVE = 6;
private static final byte OP_SET_PERMISSIONS = 7;
private static final byte OP_SET_OWNER = 8;
+ private static final byte OP_CLOSE = 9; // close after write
+ private static final byte OP_SET_GENSTAMP = 10; // store genstamp
private static int sizeFlushBuffer = 512*1024;
private ArrayList<EditLogOutputStream> editStreams = null;
@@ -377,6 +379,10 @@
FSDirectory fsDir = fsNamesys.dir;
int numEdits = 0;
int logVersion = 0;
+ INode old = null;
+ String clientName = null;
+ String clientMachine = null;
+ DatanodeDescriptor lastLocations[] = null;
if (edits != null) {
DataInputStream in = new DataInputStream(
@@ -420,8 +426,10 @@
}
numEdits++;
switch (opcode) {
- case OP_ADD: {
+ case OP_ADD:
+ case OP_CLOSE: {
UTF8 name = new UTF8();
+ String path = null;
ArrayWritable aw = null;
Writable writables[];
// version 0 does not support per file replication
@@ -441,6 +449,7 @@
writables.length + ". ");
}
name = (UTF8) writables[0];
+ path = name.toString();
replication = Short.parseShort(
((UTF8)writables[1]).toString());
replication = adjustReplication(replication);
@@ -470,16 +479,77 @@
blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
}
}
+
PermissionStatus permissions = fsNamesys.getUpgradePermission();
if (logVersion <= -11) {
permissions = PermissionStatus.read(in);
}
+ // clientname, clientMachine and block locations of last block.
+ clientName = null;
+ clientMachine = null;
+ if (opcode == OP_ADD && logVersion <= -12) {
+ UTF8 uu = new UTF8();
+ UTF8 cl = new UTF8();
+ aw = new ArrayWritable(DatanodeDescriptor.class);
+ uu.readFields(in);
+ cl.readFields(in);
+ aw.readFields(in);
+ clientName = uu.toString();
+ clientMachine = cl.toString();
+ writables = aw.get();
+ lastLocations = new DatanodeDescriptor[writables.length];
+ System.arraycopy(writables, 0, lastLocations, 0,
writables.length);
+ }
+
+ // The open lease transaction re-creates a file if necessary.
+ // Delete the file if it already exists.
+ if (FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug(opcode + ": " + name.toString() +
+ " numblocks : " + blocks.length +
+ " clientHolder " +
+ ((clientName != null) ? clientName : "") +
+ " clientMachine " +
+ ((clientMachine != null) ? clientMachine
: ""));
+ }
+
+ old = fsDir.unprotectedDelete(path, mtime, null);
+
// add to the file tree
- fsDir.unprotectedAddFile(name.toString(), permissions,
- blocks, replication, mtime, blockSize);
+ INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
+ path, permissions,
+ blocks, replication,
+ mtime, blockSize);
+ if (opcode == OP_ADD) {
+ //
+ // Replace current node with a INodeUnderConstruction.
+ // Recreate in-memory lease record.
+ //
+ INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+
INode.string2Bytes(node.getLocalName()),
+ node.getReplication(),
+ node.getModificationTime(),
+ node.getPreferredBlockSize(),
+ node.getBlocks(),
+ node.getPermissionStatus(),
+ clientName,
+ clientMachine,
+ null,
+ lastLocations);
+ fsDir.replaceNode(path, node, cons);
+ fsNamesys.addLease(path, clientName);
+ } else if (opcode == OP_CLOSE) {
+ //
+ // Remove lease if it exists.
+ //
+ if (old.isUnderConstruction()) {
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction)
+ old;
+ fsNamesys.removeLease(path, cons.getClientName());
+ }
+ }
break;
- }
+ }
case OP_SET_REPLICATION: {
UTF8 src = new UTF8();
UTF8 repl = new UTF8();
@@ -534,7 +604,11 @@
src = (UTF8) writables[0];
timestamp = Long.parseLong(((UTF8)writables[1]).toString());
}
- fsDir.unprotectedDelete(src.toString(), timestamp);
+ old = fsDir.unprotectedDelete(src.toString(), timestamp, null);
+ if (old != null && old.isUnderConstruction()) {
+ INodeFileUnderConstruction cons =
(INodeFileUnderConstruction)old;
+ fsNamesys.removeLease(src.toString(), cons.getClientName());
+ }
break;
}
case OP_MKDIR: {
@@ -563,6 +637,12 @@
fsDir.unprotectedMkdir(src.toString(),permissions,false,timestamp);
break;
}
+ case OP_SET_GENSTAMP: {
+ LongWritable aw = new LongWritable();
+ aw.readFields(in);
+ fsDir.namesystem.setGenerationStamp(aw.get());
+ break;
+ }
case OP_DATANODE_ADD: {
if (logVersion > -3)
throw new IOException("Unexpected opcode " + opcode
@@ -657,7 +737,7 @@
//
// record the transactionId when new data was written to the edits log
//
- TransactionId id = (TransactionId)myTransactionId.get();
+ TransactionId id = myTransactionId.get();
id.txid = txid;
// update statistics
@@ -675,7 +755,7 @@
long syncStart = 0;
// Fetch the transactionId of this thread.
- TransactionId id = (TransactionId)myTransactionId.get();
+ TransactionId id = myTransactionId.get();
long mytxid = id.txid;
synchronized (this) {
@@ -766,9 +846,14 @@
}
/**
- * Add create file record to edit log
+ * Add open lease record to edit log.
+ * Records the block locations of the last block.
*/
- void logCreateFile(String path, INodeFile newNode) {
+ void logOpenFile(String path, INodeFileUnderConstruction newNode)
+ throws IOException {
+
+ DatanodeDescriptor[] locations = newNode.getLastBlockLocations();
+
UTF8 nameReplicationPair[] = new UTF8[] {
new UTF8(path),
FSEditLog.toLogReplication(newNode.getReplication()),
@@ -777,6 +862,24 @@
logEdit(OP_ADD,
new ArrayWritable(UTF8.class, nameReplicationPair),
new ArrayWritable(Block.class, newNode.getBlocks()),
+ newNode.getPermissionStatus(),
+ new UTF8(newNode.getClientName()),
+ new UTF8(newNode.getClientMachine()),
+ new ArrayWritable(DatanodeDescriptor.class, locations));
+ }
+
+ /**
+ * Add close lease record to edit log.
+ */
+ void logCloseFile(String path, INodeFile newNode) {
+ UTF8 nameReplicationPair[] = new UTF8[] {
+ new UTF8(path),
+ FSEditLog.toLogReplication(newNode.getReplication()),
+ FSEditLog.toLogLong(newNode.getModificationTime()),
+ FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
+ logEdit(OP_CLOSE,
+ new ArrayWritable(UTF8.class, nameReplicationPair),
+ new ArrayWritable(Block.class, newNode.getBlocks()),
newNode.getPermissionStatus());
}
@@ -833,6 +936,13 @@
new UTF8(src),
FSEditLog.toLogLong(timestamp)};
logEdit(OP_DELETE, new ArrayWritable(UTF8.class, info));
+ }
+
+ /**
+ * Add generation stamp record to edit log
+ */
+ void logGenerationStamp(long genstamp) {
+ logEdit(OP_SET_GENSTAMP, new LongWritable(genstamp));
}
static UTF8 toLogReplication(short replication) {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Wed Feb 13
21:35:14 2008
@@ -41,6 +41,7 @@
import org.apache.hadoop.dfs.FSConstants.NodeType;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
/**
* FSImage handles checkpointing and logging of the namespace edits.
@@ -668,8 +669,9 @@
// read image version: first appeared in version -1
int imgVersion = in.readInt();
// read namespaceID: first appeared in version -2
- if (imgVersion <= -2)
+ if (imgVersion <= -2) {
this.namespaceID = in.readInt();
+ }
// read number of files
int numFiles = 0;
// version 0 does not store version #
@@ -681,6 +683,11 @@
numFiles = in.readInt();
}
this.layoutVersion = imgVersion;
+ // read in the last generation stamp.
+ if (imgVersion <= -12) {
+ long genstamp = in.readLong();
+ fsNamesys.setGenerationStamp(genstamp);
+ }
needToSave = (imgVersion != FSConstants.LAYOUT_VERSION);
@@ -737,6 +744,9 @@
// load datanode info
this.loadDatanodes(imgVersion, in);
+
+ // load Files Under Construction
+ this.loadFilesUnderConstruction(imgVersion, in, fsNamesys);
} finally {
in.close();
}
@@ -776,8 +786,9 @@
out.writeInt(FSConstants.LAYOUT_VERSION);
out.writeInt(namespaceID);
out.writeInt(fsDir.rootDir.numItemsInTree() - 1);
+ out.writeLong(fsNamesys.getGenerationStamp());
saveImage("", fsDir.rootDir, out);
- saveDatanodes(out);
+ fsNamesys.saveFilesUnderConstruction(out);
} finally {
out.close();
}
@@ -882,21 +893,12 @@
}
}
- /**
- * Earlier version used to store all the known datanodes.
- * DFS don't store datanodes anymore.
- *
- * @param out output stream
- * @throws IOException
- */
- void saveDatanodes(DataOutputStream out) throws IOException {
- // we don't store datanodes anymore.
- out.writeInt(0);
- }
-
void loadDatanodes(int version, DataInputStream in) throws IOException {
if (version > -3) // pre datanode image version
return;
+ if (version <= -12) {
+ return; // new versions do not store the datanodes any more.
+ }
int size = in.readInt();
for(int i = 0; i < size; i++) {
DatanodeImage nodeImage = new DatanodeImage();
@@ -905,6 +907,104 @@
}
}
+ private void loadFilesUnderConstruction(int version, DataInputStream in,
+ FSNamesystem fs) throws IOException {
+
+ FSDirectory fsDir = fs.dir;
+ if (version > -12) // pre lease image version
+ return;
+ int size = in.readInt();
+
+ for (int i = 0; i < size; i++) {
+ INodeFileUnderConstruction cons = readINodeUnderConstruction(in);
+
+ // verify that file exists in namespace
+ String path = cons.getLocalName();
+ INode old = fsDir.getFileINode(path);
+ if (old == null) {
+ throw new IOException("Found lease for non-existent file " + path);
+ }
+ if (old.isDirectory()) {
+ throw new IOException("Found lease for directory " + path);
+ }
+ INodeFile oldnode = (INodeFile) old;
+ fsDir.replaceNode(path, oldnode, cons);
+ fs.addLease(path, cons.getClientName());
+ }
+ if (fs.countLease() != size) {
+ throw new IOException("Created " + size + " leases but found " +
+ fs.countLease());
+ }
+ }
+
+ // Helper function that reads in an INodeUnderConstruction
+ // from the input stream
+ //
+ static INodeFileUnderConstruction readINodeUnderConstruction(
+ DataInputStream in) throws IOException {
+ UTF8 src = new UTF8();
+ src.readFields(in);
+ byte[] name = src.getBytes();
+ short blockReplication = in.readShort();
+ long modificationTime = in.readLong();
+ long preferredBlockSize = in.readLong();
+ int numBlocks = in.readInt();
+ BlockInfo[] blocks = new BlockInfo[numBlocks];
+ for (int i = 0; i < numBlocks; i++) {
+ blocks[i].readFields(in);
+ }
+
+ PermissionStatus perm = PermissionStatus.read(in);
+ UTF8 clientName = new UTF8();
+ clientName.readFields(in);
+ UTF8 clientMachine = new UTF8();
+ clientMachine.readFields(in);
+
+ int numLocs = in.readInt();
+ DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocs];
+ for (int i = 0; i < numLocs; i++) {
+ locations[i].readFields(in);
+ }
+
+ return new INodeFileUnderConstruction(name,
+ blockReplication,
+ modificationTime,
+ preferredBlockSize,
+ blocks,
+ perm,
+ clientName.toString(),
+ clientMachine.toString(),
+ null,
+ locations);
+
+ }
+
+ // Helper function that writes an INodeUnderConstruction
+ // into the input stream
+ //
+ static void writeINodeUnderConstruction(DataOutputStream out,
+ INodeFileUnderConstruction cons)
+ throws IOException {
+ new UTF8(cons.getAbsoluteName()).write(out);
+ out.writeShort(cons.getReplication());
+ out.writeLong(cons.getModificationTime());
+ out.writeLong(cons.getPreferredBlockSize());
+ int nrBlocks = cons.getBlocks().length;
+ out.writeInt(nrBlocks);
+ for (int i = 0; i < nrBlocks; i++) {
+ cons.getBlocks()[i].write(out);
+ }
+ cons.getPermissionStatus().write(out);
+ new UTF8(cons.getClientName()).write(out);
+ new UTF8(cons.getClientMachine()).write(out);
+
+ int numLocs = cons.getLastBlockLocations().length;
+ out.writeInt(numLocs);
+ for (int i = 0; i < numLocs; i++) {
+ cons.getLastBlockLocations()[i].write(out);
+ }
+ }
+
/**
* Moves fsimage.ckpt to fsImage and edits.new to edits
* Reopens the new edits file.
@@ -1118,4 +1218,5 @@
+ um.getUpgradeVersion() + " to current LV "
+ FSConstants.LAYOUT_VERSION + " is initialized.");
}
+
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Feb
13 21:35:14 2008
@@ -38,6 +38,7 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
+import java.io.DataOutputStream;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.Map.Entry;
@@ -221,6 +222,12 @@
private long maxFsObjects = 0; // maximum number of fs objects
+ /**
+ * The global generation stamp for this file system.
+ * Valid values start from 1000.
+ */
+ private GenerationStamp generationStamp = new GenerationStamp(1000);
+
private long softLimit = LEASE_SOFTLIMIT_PERIOD;
private long hardLimit = LEASE_HARDLIMIT_PERIOD;
@@ -1029,27 +1036,19 @@
DatanodeDescriptor clientNode =
host2DataNodeMap.getDatanodeByHost(clientMachine);
- synchronized (sortedLeases) {
- Lease lease = getLease(holder);
- if (lease == null) {
- lease = new Lease(holder);
- putLease(holder, lease);
- sortedLeases.add(lease);
- } else {
- sortedLeases.remove(lease);
- lease.renew();
- sortedLeases.add(lease);
- }
- lease.startedCreate(src);
- }
+ addLease(src, holder);
//
// Now we can add the name to the filesystem. This file has no
// blocks associated with it.
//
checkFsObjectLimit();
+
+ // increment global generation stamp
+ long genstamp = generationStamp.nextStamp();
+
INode newNode = dir.addFile(src, permissions,
- replication, blockSize, holder, clientMachine, clientNode);
+ replication, blockSize, holder, clientMachine, clientNode, genstamp);
if (newNode == null) {
throw new IOException("DIR* NameSystem.startFile: " +
"Unable to add file to namespace.");
@@ -1106,28 +1105,30 @@
blockSize = pendingFile.getPreferredBlockSize();
clientNode = pendingFile.getClientNode();
replication = (int)pendingFile.getReplication();
- newBlock = allocateBlock(src, pendingFile);
}
+ // choose targets for the new block tobe allocated.
DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
clientNode,
null,
blockSize);
if (targets.length < this.minReplication) {
- // if we could not find any targets, remove this block from file
- synchronized (this) {
- INodeFile iFile = dir.getFileINode(src);
- if (iFile != null && iFile.isUnderConstruction()) {
- INodeFileUnderConstruction pendingFile =
(INodeFileUnderConstruction)iFile;
- if (pendingFile.getClientName().equals(clientName)) {
- dir.removeBlock(src, pendingFile, newBlock);
- }
- }
- }
throw new IOException("File " + src + " could only be replicated to " +
targets.length + " nodes, instead of " +
minReplication);
}
+
+ // Allocate a new block and record it in the INode.
+ synchronized (this) {
+ INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
+ if (!checkFileProgress(pendingFile, false)) {
+ throw new NotReplicatedYetException("Not replicated yet:" + src);
+ }
+
+ // allocate new block record block locations in INode.
+ newBlock = allocateBlock(src, pendingFile);
+ pendingFile.setLastBlockLocations(targets);
+ }
// Create next block
return new LocatedBlock(newBlock, targets, fileLength);
@@ -1143,7 +1144,7 @@
//
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+b.getBlockName()+"of file "+src);
- INode file = checkLease(src, holder);
+ INodeFileUnderConstruction file = checkLease(src, holder);
dir.removeBlock(src, file, b);
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ b.getBlockName()
@@ -1234,22 +1235,13 @@
INodeFile newFile = pendingFile.convertToInodeFile();
dir.replaceNode(src, pendingFile, newFile);
- // persist block allocations for this file
- dir.persistBlocks(src, newFile);
+ // close file and persist block allocations for this file
+ dir.closeFile(src, newFile);
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
+ " blocklist persisted");
- synchronized (sortedLeases) {
- Lease lease = getLease(holder);
- if (lease != null) {
- lease.completedCreate(src);
- if (!lease.hasLocks()) {
- removeLease(holder);
- sortedLeases.remove(lease);
- }
- }
- }
+ removeLease(src, holder);
//
// REMIND - mjc - this should be done only after we wait a few secs.
@@ -1463,23 +1455,26 @@
checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
}
- Block deletedBlocks[] = dir.delete(src);
- if (deletedBlocks != null) {
- for (int i = 0; i < deletedBlocks.length; i++) {
- Block b = deletedBlocks[i];
-
- for (Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(b); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- addToInvalidates(b, node);
- NameNode.stateChangeLog.info("BLOCK* NameSystem.delete: "
- + b.getBlockName() + " is added to
invalidSet of "
- + node.getName());
- }
+ ArrayList<Block> deletedBlocks = new ArrayList<Block>();
+ INode old = dir.delete(src, deletedBlocks);
+ if (old == null) {
+ return false;
+ }
+ for (Block b : deletedBlocks) {
+ for (Iterator<DatanodeDescriptor> it =
+ blocksMap.nodeIterator(b); it.hasNext();) {
+ DatanodeDescriptor node = it.next();
+ addToInvalidates(b, node);
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.delete: "
+ + b.getBlockName() + " is added to
invalidSet of "
+ + node.getName());
}
}
-
- return (deletedBlocks != null);
+ if (old.isUnderConstruction()) {
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction) old;
+ removeLease(src, cons.getClientName());
+ }
+ return true;
}
/**
@@ -1586,6 +1581,24 @@
return dir.getContentLength(src);
}
+ /** Persist all metadata about this file.
+ * @param src The string representation of the path
+ * @param clientName The string representation of the client
+ * @throws IOException if path does not exist
+ */
+ void fsync(String src, String clientName) throws IOException {
+
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
+ + src + " for " + clientName);
+ synchronized (this) {
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot fsync file " + src, safeMode);
+ }
+ INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
+ dir.persistBlocks(src, pendingFile);
+ }
+ }
+
/************************************************************
* A Lease governs all the locks held by a single client.
* For each client there's a corresponding lease, whose
@@ -1689,6 +1702,10 @@
String getHolder() throws IOException {
return holder.getString();
}
+
+ Collection<StringBytesWritable> getPaths() throws IOException {
+ return creates;
+ }
}
/******************************************************
@@ -1778,8 +1795,8 @@
INodeFile newFile = pendingFile.convertToInodeFile();
dir.replaceNode(src, pendingFile, newFile);
- // persist block allocations for this file
- dir.persistBlocks(src, newFile);
+ // close file and persist block allocations for this file
+ dir.closeFile(src, newFile);
NameNode.stateChangeLog.debug("DIR* NameSystem.internalReleaseCreate: " +
src + " is no longer written to by " +
@@ -4049,7 +4066,6 @@
this.lmthread.interrupt();
}
-
public long getFilesTotal() {
return this.dir.totalInodes();
}
@@ -4121,5 +4137,92 @@
}
}
return numDead;
+ }
+
+ /**
+ * Sets the generation stamp for this filesystem
+ */
+ void setGenerationStamp(long stamp) {
+ generationStamp.setStamp(stamp);
+ }
+
+ /**
+ * Gets the generation stamp for this filesystem
+ */
+ long getGenerationStamp() {
+ return generationStamp.getStamp();
+ }
+
+ /**
+ * deletes the lease for the specified file
+ */
+ void removeLease(String src, String holder) throws IOException {
+ synchronized (sortedLeases) {
+ Lease lease = getLease(holder);
+ if (lease != null) {
+ lease.completedCreate(src);
+ if (!lease.hasLocks()) {
+ removeLease(holder);
+ sortedLeases.remove(lease);
+ }
+ }
+ }
+ }
+
+ /**
+ * Adds (or re-adds) the lease for the specified file.
+ */
+ void addLease(String src, String holder) throws IOException {
+ synchronized (sortedLeases) {
+ Lease lease = getLease(holder);
+ if (lease == null) {
+ lease = new Lease(holder);
+ putLease(holder, lease);
+ sortedLeases.add(lease);
+ } else {
+ sortedLeases.remove(lease);
+ lease.renew();
+ sortedLeases.add(lease);
+ }
+ lease.startedCreate(src);
+ }
+ }
+
+ /**
+ * Returns the number of leases currently in the system
+ */
+ int countLease() {
+ synchronized (sortedLeases) {
+ return sortedLeases.size();
+ }
+ }
+
+ /**
+ * Serializes leases
+ */
+ void saveFilesUnderConstruction(DataOutputStream out) throws IOException {
+ synchronized (sortedLeases) {
+ out.writeInt(sortedLeases.size()); // write the size
+ for (Iterator<Lease> it = sortedLeases.iterator(); it.hasNext();) {
+ Lease lease = it.next();
+ Collection<StringBytesWritable> files = lease.getPaths();
+ for (Iterator<StringBytesWritable> i = files.iterator(); i.hasNext();){
+ String path = i.next().getString();
+
+ // verify that path exists in namespace
+ INode node = dir.getFileINode(path);
+ if (node == null) {
+ throw new IOException("saveLeases found path " + path +
+ " but no matching entry in namespace.");
+ }
+ if (!node.isUnderConstruction()) {
+ throw new IOException("saveLeases found path " + path +
+ " but is not under construction.");
+ }
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
+ FSImage.writeINodeUnderConstruction(out, cons);
+ }
+ }
+ }
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/INode.java Wed Feb 13
21:35:14 2008
@@ -24,10 +24,15 @@
import java.util.Arrays;
import java.util.List;
import java.io.IOException;
+import java.io.DataOutput;
+import java.io.DataInput;
+import java.io.DataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.*;
import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.UTF8;
/**
* We keep an in-memory representation of the file/block hierarchy.
@@ -67,9 +72,10 @@
}
}
- protected INode(String name, PermissionStatus permissions) {
- this(permissions, 0L);
- setLocalName(name);
+ protected INode() {
+ name = null;
+ parent = null;
+ modificationTime = 0;
}
INode(PermissionStatus permissions, long mTime) {
@@ -79,6 +85,11 @@
setPermissionStatus(permissions);
}
+ protected INode(String name, PermissionStatus permissions) {
+ this(permissions, 0L);
+ setLocalName(name);
+ }
+
/** Set the [EMAIL PROTECTED] PermissionStatus} */
protected void setPermissionStatus(PermissionStatus ps) {
setUser(ps.getUserName());
@@ -145,6 +156,13 @@
this.name = string2Bytes(name);
}
+ /**
+ * Set local file name
+ */
+ void setLocalName(byte[] name) {
+ this.name = name;
+ }
+
/** [EMAIL PROTECTED] */
public String toString() {
return "\"" + getLocalName() + "\":" + getPermissionStatus();
@@ -570,7 +588,7 @@
class INodeFile extends INode {
static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
- private BlockInfo blocks[] = null;
+ protected BlockInfo blocks[] = null;
protected short blockReplication;
protected long preferredBlockSize;
@@ -581,6 +599,12 @@
modificationTime, preferredBlockSize);
}
+ protected INodeFile() {
+ blocks = null;
+ blockReplication = 0;
+ preferredBlockSize = 0;
+ }
+
protected INodeFile(PermissionStatus permissions, BlockInfo[] blklist,
short replication, long modificationTime,
long preferredBlockSize) {
@@ -711,6 +735,14 @@
protected StringBytesWritable clientName; // lease holder
protected StringBytesWritable clientMachine;
protected DatanodeDescriptor clientNode; // if client is a cluster node too.
+ protected DatanodeDescriptor[] targets; // locations for last block
+
+ INodeFileUnderConstruction() {
+ clientName = null;
+ clientMachine = null;
+ clientNode = null;
+ clientNode = null;
+ }
INodeFileUnderConstruction(PermissionStatus permissions,
short replication,
@@ -725,6 +757,27 @@
this.clientName = new StringBytesWritable(clientName);
this.clientMachine = new StringBytesWritable(clientMachine);
this.clientNode = clientNode;
+ this.targets = new DatanodeDescriptor[0];
+ }
+
+ INodeFileUnderConstruction(byte[] name,
+ short blockReplication,
+ long modificationTime,
+ long preferredBlockSize,
+ BlockInfo[] blocks,
+ PermissionStatus perm,
+ String clientName,
+ String clientMachine,
+ DatanodeDescriptor clientNode,
+ DatanodeDescriptor[] targets)
+ throws IOException {
+ super(perm, blocks, blockReplication, modificationTime,
+ preferredBlockSize);
+ setLocalName(name);
+ this.clientName = new StringBytesWritable(clientName);
+ this.clientMachine = new StringBytesWritable(clientMachine);
+ this.clientNode = clientNode;
+ this.targets = targets;
}
String getClientName() throws IOException {
@@ -737,6 +790,14 @@
DatanodeDescriptor getClientNode() {
return clientNode;
+ }
+
+ void setLastBlockLocations(DatanodeDescriptor[] targets) {
+ this.targets = targets;
+ }
+
+ DatanodeDescriptor[] getLastBlockLocations() {
+ return this.targets;
}
/**
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Wed Feb 13
21:35:14 2008
@@ -531,6 +531,11 @@
return namesystem.getContentLength(src);
}
+ /** [EMAIL PROTECTED] */
+ public void fsync(String src, String clientName) throws IOException {
+ namesystem.fsync(src, clientName);
+ }
+
////////////////////////////////////////////////////////////////
// DatanodeProtocol
////////////////////////////////////////////////////////////////
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestEditLog.java Wed Feb
13 21:35:14 2008
@@ -38,8 +38,8 @@
static final int numDatanodes = 1;
// This test creates numThreads threads and each thread does
- // numberTransactions Transactions concurrently.
- int numberTransactions = 1000;
+ // 2 * numberTransactions Transactions concurrently.
+ int numberTransactions = 100;
int numThreads = 100;
//
@@ -62,9 +62,16 @@
).createFsOwnerPermissions(new FsPermission((short)0777));
for (int i = 0; i < numTransactions; i++) {
- INodeFile inode = new INodeFile(p, 0, replication, 0, blockSize);
- editLog.logCreateFile("/filename" + i, inode);
- editLog.logSync();
+ try {
+ INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
+ p, replication, blockSize, 0, "", "", null);
+ editLog.logOpenFile("/filename" + i, inode);
+ editLog.logCloseFile("/filename" + i, inode);
+ editLog.logSync();
+ } catch (IOException e) {
+ System.out.println("Transaction " + i + " encountered exception " +
+ e);
+ }
}
}
}
@@ -132,10 +139,16 @@
File editFile = fsimage.getEditFile(i);
System.out.println("Verifying file: " + editFile);
int numEdits = editLog.loadFSEdits(editFile);
+ System.out.println("Number of outstanding leases " +
+ FSNamesystem.getFSNamesystem().countLease());
+
+ assertTrue("Found " + FSNamesystem.getFSNamesystem().countLease() +
+ " leases but expected 0",
+ FSNamesystem.getFSNamesystem().countLease() == 0);
assertTrue("Verification for " + editFile + " failed. " +
- "Expected " + (numThreads * numberTransactions) + "
transactions. "+
+ "Expected " + (numThreads * 2 * numberTransactions) + "
transactions. "+
"Found " + numEdits + " transactions.",
- numEdits == numThreads * numberTransactions);
+ numEdits == numThreads * 2 * numberTransactions);
}
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java?rev=627663&r1=627662&r2=627663&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/dfs/TestFileCreation.java Wed
Feb 13 21:35:14 2008
@@ -331,6 +331,64 @@
}
}
+ /**
+ * Test that file leases are persisted across namenode restarts.
+ */
+ public void testFileCreationNamenodeRestart() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setInt("heartbeat.recheck.interval", 1000);
+ conf.setInt("dfs.heartbeat.interval", 1);
+ if (simulatedStorage) {
+ conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
+ }
+ // create cluster
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+ FileSystem fs = cluster.getFileSystem();
+ cluster.waitActive();
+ int nnport = cluster.getNameNodePort();
+ InetSocketAddress addr = new InetSocketAddress("localhost", nnport);
+
+ try {
+
+ // create a new file.
+ //
+ Path file1 = new Path("/filestatus.dat");
+ FSDataOutputStream stm = createFile(fs, file1, 1);
+ System.out.println("testFileCreationNamenodeRestart: "
+ + "Created file filestatus.dat with one "
+ + " replicas.");
+
+ // restart cluster with the same namenode port as before.
+ cluster.shutdown();
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ cluster = new MiniDFSCluster(nnport, conf, 1, false, true,
+ null, null, null);
+ cluster.waitActive();
+
+ // write 1 byte to file. This should succeed because the
+ // namenode should have persisted leases.
+ byte[] buffer = new byte[1];
+ Random rand = new Random(seed);
+ rand.nextBytes(buffer);
+ stm.write(buffer);
+ stm.close();
+
+ // verify that new block is associated with this file
+ DFSClient client = new DFSClient(addr, conf);
+ LocatedBlocks locations = client.namenode.getBlockLocations(
+ file1.toString(), 0, Long.MAX_VALUE);
+ System.out.println("locations = " + locations.locatedBlockCount());
+ assertTrue("Error blocks were not cleaned up",
+ locations.locatedBlockCount() == 1);
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
/**
* Test that file data becomes available before file is closed.
*/