Author: atm
Date: Fri Dec 16 04:18:58 2011
New Revision: 1215036
URL: http://svn.apache.org/viewvc?rev=1215036&view=rev
Log:
HDFS-2602. NN should log newly-allocated blocks without losing BlockInfo.
Contributed by Aaron T. Myers
Added:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
Fri Dec 16 04:18:58 2011
@@ -57,3 +57,5 @@ HDFS-2680. DFSClient should construct fa
HDFS-2683. Authority-based lookup of proxy provider fails if path becomes
canonicalized (todd)
HDFS-2689. HA: BookKeeperEditLogInputStream doesn't implement isInProgress()
(atm)
+
+HDFS-2602. NN should log newly-allocated blocks without losing BlockInfo (atm)
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
Fri Dec 16 04:18:58 2011
@@ -125,6 +125,8 @@ public class DFSConfigKeys extends Commo
public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = false;
public static final String DFS_PERMISSIONS_ENABLED_KEY =
"dfs.permissions.enabled";
public static final boolean DFS_PERMISSIONS_ENABLED_DEFAULT = true;
+ public static final String DFS_PERSIST_BLOCKS_KEY = "dfs.persist.blocks";
+ public static final boolean DFS_PERSIST_BLOCKS_DEFAULT = false;
public static final String DFS_PERMISSIONS_SUPERUSERGROUP_KEY =
"dfs.permissions.superusergroup";
public static final String DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT =
"supergroup";
public static final String DFS_ADMIN = "dfs.cluster.administrators";
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
Fri Dec 16 04:18:58 2011
@@ -45,6 +45,16 @@ public class HAUtil {
}
/**
+ * Returns true if HA is using a shared edits directory.
+ *
+ * @param conf Configuration
+ * @return true if HA config is using a shared edits dir, false otherwise.
+ */
+ public static boolean usesSharedEditsDir(Configuration conf) {
+ return null != conf.get(DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
+ }
+
+ /**
* Get the namenode Id by matching the {@code addressKey}
* with the the address of the local node.
*
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
Fri Dec 16 04:18:58 2011
@@ -425,7 +425,7 @@ public class BlockManager {
final boolean b = commitBlock((BlockInfoUnderConstruction)lastBlock,
commitBlock);
if(countNodes(lastBlock).liveReplicas() >= minReplication)
- completeBlock(fileINode,fileINode.numBlocks()-1);
+ completeBlock(fileINode,fileINode.numBlocks()-1, false);
return b;
}
@@ -437,14 +437,14 @@ public class BlockManager {
* of replicas reported from data-nodes.
*/
private BlockInfo completeBlock(final INodeFile fileINode,
- final int blkIndex) throws IOException {
+ final int blkIndex, boolean force) throws IOException {
if(blkIndex < 0)
return null;
BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
if(curBlock.isComplete())
return curBlock;
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
- if(ucBlock.numNodes() < minReplication)
+ if (!force && ucBlock.numNodes() < minReplication)
throw new IOException("Cannot complete block: " +
"block does not satisfy minimal replication requirement.");
BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
@@ -455,15 +455,27 @@ public class BlockManager {
}
private BlockInfo completeBlock(final INodeFile fileINode,
- final BlockInfo block) throws IOException {
+ final BlockInfo block, boolean force) throws IOException {
BlockInfo[] fileBlocks = fileINode.getBlocks();
for(int idx = 0; idx < fileBlocks.length; idx++)
if(fileBlocks[idx] == block) {
- return completeBlock(fileINode, idx);
+ return completeBlock(fileINode, idx, force);
}
return block;
}
+
+ /**
+ * Force the given block in the given file to be marked as complete,
+ * regardless of whether enough replicas are present. This is necessary
+ * when tailing edit logs as a Standby.
+ */
+ public BlockInfo forceCompleteBlock(final INodeFile fileINode,
+ final BlockInfoUnderConstruction block) throws IOException {
+ block.commitBlock(block);
+ return completeBlock(fileINode, block, true);
+ }
+
/**
* Convert the last block of the file to an under construction block.<p>
* The block is converted only if the file has blocks and the last one
@@ -590,8 +602,8 @@ public class BlockManager {
final boolean isCorrupt = numCorruptNodes == numNodes;
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
+ int j = 0;
if (numMachines > 0) {
- int j = 0;
for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
it.hasNext();) {
final DatanodeDescriptor d = it.next();
@@ -600,6 +612,12 @@ public class BlockManager {
machines[j++] = d;
}
}
+ assert j == machines.length :
+ "isCorrupt: " + isCorrupt +
+ " numMachines: " + numMachines +
+ " numNodes: " + numNodes +
+ " numCorrupt: " + numCorruptNodes +
+ " numCorruptRepls: " + numCorruptReplicas;
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
blk);
return new LocatedBlock(eb, machines, pos, isCorrupt);
}
@@ -1608,7 +1626,7 @@ public class BlockManager {
int numCurrentReplica = countLiveNodes(storedBlock);
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
&& numCurrentReplica >= minReplication)
- storedBlock = completeBlock(storedBlock.getINode(), storedBlock);
+ storedBlock = completeBlock(storedBlock.getINode(), storedBlock, false);
// check whether safe replication is reached for the block
// only complete blocks are counted towards that
@@ -1673,7 +1691,7 @@ public class BlockManager {
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
numLiveReplicas >= minReplication)
- storedBlock = completeBlock(fileINode, storedBlock);
+ storedBlock = completeBlock(fileINode, storedBlock, false);
// check whether safe replication is reached for the block
// only complete blocks are counted towards that
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
Fri Dec 16 04:18:58 2011
@@ -263,34 +263,19 @@ public class FSDirectory implements Clos
*/
INode unprotectedAddFile( String path,
PermissionStatus permissions,
- BlockInfo[] blocks,
short replication,
long modificationTime,
long atime,
long preferredBlockSize)
throws UnresolvedLinkException {
INode newNode;
- long diskspace = UNKNOWN_DISK_SPACE;
assert hasWriteLock();
- if (blocks == null)
- newNode = new INodeDirectory(permissions, modificationTime);
- else {
- newNode = new INodeFile(permissions, blocks.length, replication,
- modificationTime, atime, preferredBlockSize);
- diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks);
- }
+ newNode = new INodeFile(permissions, new BlockInfo[0], replication,
+ modificationTime, atime, preferredBlockSize);
writeLock();
try {
try {
- newNode = addNode(path, newNode, diskspace);
- if(newNode != null && blocks != null) {
- int nrBlocks = blocks.length;
- // Add file->block mapping
- INodeFile newF = (INodeFile)newNode;
- for (int i = 0; i < nrBlocks; i++) {
- newF.setBlock(i, getBlockManager().addINode(blocks[i], newF));
- }
- }
+ newNode = addNode(path, newNode, 0);
} catch (IOException e) {
return null;
}
@@ -391,7 +376,7 @@ public class FSDirectory implements Clos
writeUnlock();
}
}
-
+
/**
* Close file.
*/
@@ -414,7 +399,7 @@ public class FSDirectory implements Clos
}
/**
- * Remove a block to the file.
+ * Remove a block from the file.
*/
boolean removeBlock(String path, INodeFileUnderConstruction fileNode,
Block block) throws IOException {
@@ -422,27 +407,32 @@ public class FSDirectory implements Clos
writeLock();
try {
- // modify file-> block and blocksMap
- fileNode.removeLastBlock(block);
- getBlockManager().removeBlockFromMap(block);
-
+ unprotectedRemoveBlock(path, fileNode, block);
// write modified block locations to log
fsImage.getEditLog().logOpenFile(path, fileNode);
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
- +path+" with "+block
- +" block is removed from the file system");
- }
-
- // update space consumed
- INode[] pathINodes = getExistingPathINodes(path);
- updateCount(pathINodes, pathINodes.length-1, 0,
- -fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
} finally {
writeUnlock();
}
return true;
}
+
+ void unprotectedRemoveBlock(String path,
+ INodeFileUnderConstruction fileNode, Block block) throws IOException {
+ // modify file-> block and blocksMap
+ fileNode.removeLastBlock(block);
+ getBlockManager().removeBlockFromMap(block);
+
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+ +path+" with "+block
+ +" block is removed from the file system");
+ }
+
+ // update space consumed
+ INode[] pathINodes = getExistingPathINodes(path);
+ updateCount(pathINodes, pathINodes.length - 1, 0,
+ - fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
+ }
/**
* @see #unprotectedRenameTo(String, String, long)
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
Fri Dec 16 04:18:58 2011
@@ -28,6 +28,7 @@ import java.util.EnumMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
@@ -57,6 +58,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.util.Holder;
+
import com.google.common.base.Joiner;
@InterfaceAudience.Private
@@ -137,82 +139,84 @@ public class FSEditLogLoader {
numEdits++;
incrOpCount(op.opCode, opCounts);
switch (op.opCode) {
- case OP_ADD:
- case OP_CLOSE: {
+ case OP_ADD: {
AddCloseOp addCloseOp = (AddCloseOp)op;
- // versions > 0 support per file replication
- // get name and replication
- final short replication = fsNamesys.getBlockManager(
- ).adjustReplication(addCloseOp.replication);
-
- long blockSize = addCloseOp.blockSize;
- BlockInfo blocks[] = new BlockInfo[addCloseOp.blocks.length];
- for (int i = 0; i < addCloseOp.blocks.length; i++) {
- if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD
- && i == addCloseOp.blocks.length-1) {
- blocks[i] = new
BlockInfoUnderConstruction(addCloseOp.blocks[i],
- replication);
- } else {
- blocks[i] = new BlockInfo(addCloseOp.blocks[i], replication);
+ // See if the file already exists (persistBlocks call)
+ INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
+ if (oldFile == null) { // this is OP_ADD on a new file
+ // versions > 0 support per file replication
+ // get name and replication
+ final short replication = fsNamesys.getBlockManager(
+ ).adjustReplication(addCloseOp.replication);
+ PermissionStatus permissions = fsNamesys.getUpgradePermission();
+ if (addCloseOp.permissions != null) {
+ permissions = addCloseOp.permissions;
+ }
+ long blockSize = addCloseOp.blockSize;
+
+ if (FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
+ " numblocks : " + addCloseOp.blocks.length +
+ " clientHolder " + addCloseOp.clientName +
+ " clientMachine " + addCloseOp.clientMachine);
}
- }
-
- PermissionStatus permissions = fsNamesys.getUpgradePermission();
- if (addCloseOp.permissions != null) {
- permissions = addCloseOp.permissions;
- }
-
- // Older versions of HDFS does not store the block size in inode.
- // If the file has more than one block, use the size of the
- // first block as the blocksize. Otherwise use the default
- // block size.
- if (-8 <= logVersion && blockSize == 0) {
- if (blocks.length > 1) {
- blockSize = blocks[0].getNumBytes();
- } else {
- long first = ((blocks.length == 1)? blocks[0].getNumBytes():
0);
- blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
+ // Older versions of HDFS does not store the block size in inode.
+ // If the file has more than one block, use the size of the
+ // first block as the blocksize. Otherwise use the default
+ // block size.
+ if (-8 <= logVersion && blockSize == 0) {
+ if (addCloseOp.blocks.length > 1) {
+ blockSize = addCloseOp.blocks[0].getNumBytes();
+ } else {
+ long first = ((addCloseOp.blocks.length == 1)?
+ addCloseOp.blocks[0].getNumBytes(): 0);
+ blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
+ }
}
- }
+ // TODO: We should do away with this add-then-replace dance.
- // The open lease transaction re-creates a file if necessary.
- // Delete the file if it already exists.
- if (FSNamesystem.LOG.isDebugEnabled()) {
- FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
- " numblocks : " + blocks.length +
- " clientHolder " + addCloseOp.clientName +
- " clientMachine " + addCloseOp.clientMachine);
+ // add to the file tree
+ INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
+ addCloseOp.path, permissions,
+ replication, addCloseOp.mtime,
+ addCloseOp.atime, blockSize);
+
+ fsNamesys.prepareFileForWrite(addCloseOp.path, node,
+ addCloseOp.clientName, addCloseOp.clientMachine, null);
+ } else { // This is OP_ADD on an existing file
+ if (!oldFile.isUnderConstruction()) {
+ // This is a call to append() on an already-closed file.
+ fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
+ addCloseOp.clientName, addCloseOp.clientMachine, null);
+ oldFile = getINodeFile(fsDir, addCloseOp.path);
+ }
+
+ updateBlocks(fsDir, addCloseOp, oldFile);
}
-
- fsDir.unprotectedDelete(addCloseOp.path, addCloseOp.mtime);
-
- // add to the file tree
- INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
- addCloseOp.path, permissions,
- blocks, replication,
- addCloseOp.mtime, addCloseOp.atime, blockSize);
- if (addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
- //
- // Replace current node with a INodeUnderConstruction.
- // Recreate in-memory lease record.
- //
- INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
- node.getLocalNameBytes(),
- node.getReplication(),
- node.getModificationTime(),
- node.getPreferredBlockSize(),
- node.getBlocks(),
- node.getPermissionStatus(),
- addCloseOp.clientName,
- addCloseOp.clientMachine,
- null);
- fsDir.replaceNode(addCloseOp.path, node, cons);
- fsNamesys.leaseManager.addLease(cons.getClientName(),
- addCloseOp.path);
+ break;
+ }
+ case OP_CLOSE: {
+ AddCloseOp addCloseOp = (AddCloseOp)op;
+
+ INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
+ if (oldFile == null) {
+ throw new IOException("Operation trying to close non-existent
file " +
+ addCloseOp.path);
}
+
+ // Update in-memory data structures
+ updateBlocks(fsDir, addCloseOp, oldFile);
+
+ // Now close the file
+ INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction)
oldFile;
+ // TODO: we could use removeLease(holder, path) here, but OP_CLOSE
+ // doesn't seem to serialize the holder... unclear why!
+ fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
+ INodeFile newFile = ucFile.convertToInodeFile();
+ fsDir.replaceNode(addCloseOp.path, ucFile, newFile);
break;
}
case OP_SET_REPLICATION: {
@@ -404,7 +408,88 @@ public class FSEditLogLoader {
}
return numEdits;
}
-
+
+ private static INodeFile getINodeFile(FSDirectory fsDir, String path)
+ throws IOException {
+ INode inode = fsDir.getINode(path);
+ if (inode != null) {
+ if (!(inode instanceof INodeFile)) {
+ throw new IOException("Operation trying to get non-file " + path);
+ }
+ }
+ return (INodeFile)inode;
+ }
+
+ /**
+ * Update in-memory data structures with new block information.
+ * @throws IOException
+ */
+ private void updateBlocks(FSDirectory fsDir, AddCloseOp addCloseOp,
+ INodeFile file) throws IOException {
+
+ // Update the salient file attributes.
+ file.setAccessTime(addCloseOp.atime);
+ file.setModificationTimeForce(addCloseOp.mtime);
+
+ // Update its block list
+ BlockInfo[] oldBlocks = file.getBlocks();
+
+ // Are we only updating the last block's gen stamp.
+ boolean isGenStampUpdate = oldBlocks.length == addCloseOp.blocks.length;
+
+ // First, update blocks in common
+ for (int i = 0; i < oldBlocks.length && i < addCloseOp.blocks.length; i++)
{
+ BlockInfo oldBlock = oldBlocks[i];
+ Block newBlock = addCloseOp.blocks[i];
+
+ boolean isLastBlock = i == oldBlocks.length - 1;
+ if (oldBlock.getBlockId() != newBlock.getBlockId() ||
+ (oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() &&
+ !(isGenStampUpdate && isLastBlock))) {
+ throw new IOException("Mismatched block IDs or generation stamps, " +
+ "attempting to replace block " + oldBlock + " with " + newBlock +
+ " as block # " + i + "/" + addCloseOp.blocks.length + " of " +
+ addCloseOp.path);
+ }
+
+ oldBlock.setNumBytes(newBlock.getNumBytes());
+ oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
+
+ if (oldBlock instanceof BlockInfoUnderConstruction &&
+ (!isLastBlock || addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE)) {
+ fsNamesys.getBlockManager().forceCompleteBlock(
+ (INodeFileUnderConstruction)file,
+ (BlockInfoUnderConstruction)oldBlock);
+ }
+ }
+
+ if (addCloseOp.blocks.length < oldBlocks.length) {
+ // We're removing a block from the file, e.g. abandonBlock(...)
+ if (!file.isUnderConstruction()) {
+ throw new IOException("Trying to remove a block from file " +
+ addCloseOp.path + " which is not under construction.");
+ }
+ if (addCloseOp.blocks.length != oldBlocks.length - 1) {
+ throw new IOException("Trying to remove more than one block from file "
+ + addCloseOp.path);
+ }
+ fsDir.unprotectedRemoveBlock(addCloseOp.path,
+ (INodeFileUnderConstruction)file, oldBlocks[oldBlocks.length - 1]);
+ } else if (addCloseOp.blocks.length > oldBlocks.length) {
+ // We're adding blocks
+ for (int i = oldBlocks.length; i < addCloseOp.blocks.length; i++) {
+ Block newBlock = addCloseOp.blocks[i];
+ BlockInfo newBI = new BlockInfoUnderConstruction(newBlock,
file.getReplication());
+ fsNamesys.getBlockManager().addINode(newBI, file);
+ file.addBlock(newBI);
+ }
+ }
+
+ if (addCloseOp.blocks.length > 0) {
+ fsNamesys.notifyGenStampUpdate(
+ addCloseOp.blocks[addCloseOp.blocks.length -
1].getGenerationStamp());
+ }
+ }
private static void dumpOpCounts(
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts) {
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Fri Dec 16 04:18:58 2011
@@ -52,6 +52,8 @@ import static org.apache.hadoop.hdfs.DFS
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_DEFAULT;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT;
import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
@@ -203,7 +205,7 @@ import com.google.common.base.Preconditi
@Metrics(context="dfs")
public class FSNamesystem implements Namesystem, FSClusterStats,
FSNamesystemMBean, NameNodeMXBean {
- static final Log LOG = LogFactory.getLog(FSNamesystem.class);
+ public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
private static final ThreadLocal<StringBuilder> auditBuffer =
new ThreadLocal<StringBuilder>() {
@@ -252,6 +254,7 @@ public class FSNamesystem implements Nam
static final int DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED = 100;
static int BLOCK_DELETION_INCREMENT = 1000;
private boolean isPermissionEnabled;
+ private boolean persistBlocks;
private UserGroupInformation fsOwner;
private String supergroup;
private PermissionStatus defaultPermission;
@@ -669,6 +672,15 @@ public class FSNamesystem implements Nam
DFS_PERMISSIONS_ENABLED_DEFAULT);
LOG.info("supergroup=" + supergroup);
LOG.info("isPermissionEnabled=" + isPermissionEnabled);
+
+ this.persistBlocks = conf.getBoolean(DFS_PERSIST_BLOCKS_KEY,
+ DFS_PERSIST_BLOCKS_DEFAULT);
+ // block allocation has to be persisted in HA using a shared edits
directory
+ // so that the standby has up-to-date namespace information
+ String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
+ this.persistBlocks |= HAUtil.isHAEnabled(conf, nameserviceId) &&
+ HAUtil.usesSharedEditsDir(conf);
+
short filePermission =
(short)conf.getInt(DFS_NAMENODE_UPGRADE_PERMISSION_KEY,
DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
this.defaultPermission = PermissionStatus.createImmutable(
@@ -1403,26 +1415,7 @@ public class FSNamesystem implements Nam
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
if (append && myFile != null) {
- //
- // Replace current node with a INodeUnderConstruction.
- // Recreate in-memory lease record.
- //
- INodeFile node = (INodeFile) myFile;
- INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
- node.getLocalNameBytes(),
- node.getReplication(),
- node.getModificationTime(),
- node.getPreferredBlockSize(),
- node.getBlocks(),
- node.getPermissionStatus(),
- holder,
- clientMachine,
- clientNode);
- dir.replaceNode(src, node, cons);
- leaseManager.addLease(cons.getClientName(), src);
-
- // convert last block to under-construction
- return blockManager.convertLastBlockToUnderConstruction(cons);
+ return prepareFileForWrite(src, myFile, holder, clientMachine,
clientNode);
} else {
// Now we can add the name to the filesystem. This file has no
// blocks associated with it.
@@ -1450,6 +1443,39 @@ public class FSNamesystem implements Nam
}
return null;
}
+
+ /**
+ * Replace current node with a INodeUnderConstruction.
+ * Recreate in-memory lease record.
+ *
+ * @param src path to the file
+ * @param file existing file object
+ * @param leaseHolder identifier of the lease holder on this file
+ * @param clientMachine identifier of the client machine
+ * @param clientNode if the client is collocated with a DN, that DN's
descriptor
+ * @return the last block locations if the block is partial or null otherwise
+ * @throws UnresolvedLinkException
+ * @throws IOException
+ */
+ public LocatedBlock prepareFileForWrite(String src, INode file,
+ String leaseHolder, String clientMachine, DatanodeDescriptor clientNode)
+ throws UnresolvedLinkException, IOException {
+ INodeFile node = (INodeFile) file;
+ INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
+ node.getLocalNameBytes(),
+ node.getReplication(),
+ node.getModificationTime(),
+ node.getPreferredBlockSize(),
+ node.getBlocks(),
+ node.getPermissionStatus(),
+ leaseHolder,
+ clientMachine,
+ clientNode);
+ dir.replaceNode(src, node, cons);
+ leaseManager.addLease(cons.getClientName(), src);
+
+ return blockManager.convertLastBlockToUnderConstruction(cons);
+ }
/**
* Recover lease;
@@ -1700,10 +1726,14 @@ public class FSNamesystem implements Nam
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
- }
+ }
+ dir.persistBlocks(src, pendingFile);
} finally {
writeUnlock();
}
+ if (persistBlocks) {
+ getEditLog().logSync();
+ }
// Create next block
LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets,
fileLength);
@@ -1782,10 +1812,15 @@ public class FSNamesystem implements Nam
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ b + " is removed from pendingCreates");
}
- return true;
+ dir.persistBlocks(src, file);
} finally {
writeUnlock();
}
+ if (persistBlocks) {
+ getEditLog().logSync();
+ }
+
+ return true;
}
// make sure that we still have the lease on this file.
@@ -2594,8 +2629,8 @@ public class FSNamesystem implements Nam
//remove lease, close file
finalizeINodeFileUnderConstruction(src, pendingFile);
} else if (supportAppends) {
- // If this commit does not want to close the file, persist
- // blocks only if append is supported
+ // If this commit does not want to close the file, persist blocks
+ // only if append is supported or we're explicitly told to
dir.persistBlocks(src, pendingFile);
}
} finally {
@@ -3565,7 +3600,8 @@ public class FSNamesystem implements Nam
}
assert node != null : "Found a lease for nonexisting file.";
assert node.isUnderConstruction() :
- "Found a lease for file that is not under construction.";
+ "Found a lease for file " + path + " that is not under
construction." +
+ " lease=" + lease;
INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
BlockInfo[] blocks = cons.getBlocks();
if(blocks == null)
@@ -3881,7 +3917,6 @@ public class FSNamesystem implements Nam
*/
void setGenerationStamp(long stamp) {
generationStamp.setStamp(stamp);
- notifyGenStampUpdate(stamp);
}
/**
@@ -4000,7 +4035,7 @@ public class FSNamesystem implements Nam
} finally {
writeUnlock();
}
- if (supportAppends) {
+ if (supportAppends || persistBlocks) {
getEditLog().logSync();
}
LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
Fri Dec 16 04:18:58 2011
@@ -153,6 +153,9 @@ public class LeaseManager {
Lease lease = getLease(holder);
if (lease != null) {
removeLease(lease, src);
+ } else {
+ LOG.warn("Removing non-existent lease! holder=" + holder +
+ " src=" + src);
}
}
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/PendingDataNodeMessages.java
Fri Dec 16 04:18:58 2011
@@ -188,7 +188,7 @@ public class PendingDataNodeMessages {
*/
synchronized DataNodeMessage take(long gs) {
DataNodeMessage m = queue.peek();
- if (m != null && m.getTargetGs() < gs) {
+ if (m != null && m.getTargetGs() <= gs) {
return queue.remove();
} else {
return null;
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
Fri Dec 16 04:18:58 2011
@@ -152,4 +152,5 @@ public class EditLogTailer {
}
}
}
+
}
Added:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java?rev=1215036&view=auto
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java
(added)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPersistBlocks.java
Fri Dec 16 04:18:58 2011
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Level;
+
+import java.io.IOException;
+import java.util.Random;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * A JUnit test for checking if restarting DFS preserves the
+ * blocks that are part of an unclosed file.
+ */
+public class TestPersistBlocks {
+ static {
+ ((Log4JLogger)FSImage.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ private static final int BLOCK_SIZE = 4096;
+ private static final int NUM_BLOCKS = 5;
+
+ private static final String FILE_NAME = "/data";
+ private static final Path FILE_PATH = new Path(FILE_NAME);
+
+ static final byte[] DATA_BEFORE_RESTART = new byte[BLOCK_SIZE * NUM_BLOCKS];
+ static final byte[] DATA_AFTER_RESTART = new byte[BLOCK_SIZE * NUM_BLOCKS];
+ static {
+ Random rand = new Random();
+ rand.nextBytes(DATA_BEFORE_RESTART);
+ rand.nextBytes(DATA_AFTER_RESTART);
+ }
+
+ /** check if DFS remains in proper condition after a restart */
+ @Test
+ public void testRestartDfs() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ // Turn off persistent IPC, so that the DFSClient can survive NN restart
+ conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+ 0);
+ conf.setBoolean(DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY, true);
+ MiniDFSCluster cluster = null;
+
+ long len = 0;
+ FSDataOutputStream stream;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ FileSystem fs = cluster.getFileSystem();
+ // Creating a file with 4096 blockSize to write multiple blocks
+ stream = fs.create(FILE_PATH, true, BLOCK_SIZE, (short) 1, BLOCK_SIZE);
+ stream.write(DATA_BEFORE_RESTART);
+ stream.hflush();
+
+ // Wait for at least a few blocks to get through
+ while (len <= BLOCK_SIZE) {
+ FileStatus status = fs.getFileStatus(FILE_PATH);
+ len = status.getLen();
+ Thread.sleep(100);
+ }
+
+ // explicitly do NOT close the file.
+ cluster.restartNameNode();
+
+ // Check that the file has no less bytes than before the restart
+ // This would mean that blocks were successfully persisted to the log
+ FileStatus status = fs.getFileStatus(FILE_PATH);
+ assertTrue("Length too short: " + status.getLen(),
+ status.getLen() >= len);
+
+ // And keep writing (ensures that leases are also persisted correctly)
+ stream.write(DATA_AFTER_RESTART);
+ stream.close();
+
+ // Verify that the data showed up, both from before and after the
restart.
+ FSDataInputStream readStream = fs.open(FILE_PATH);
+ try {
+ byte[] verifyBuf = new byte[DATA_BEFORE_RESTART.length];
+ IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
+ assertArrayEquals(DATA_BEFORE_RESTART, verifyBuf);
+
+ IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
+ assertArrayEquals(DATA_AFTER_RESTART, verifyBuf);
+ } finally {
+ IOUtils.closeStream(readStream);
+ }
+ } finally {
+ if (cluster != null) { cluster.shutdown(); }
+ }
+ }
+
+ @Test
+ public void testRestartDfsWithAbandonedBlock() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ // Turn off persistent IPC, so that the DFSClient can survive NN restart
+ conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+ 0);
+ conf.setBoolean(DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY, true);
+ MiniDFSCluster cluster = null;
+
+ long len = 0;
+ FSDataOutputStream stream;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ FileSystem fs = cluster.getFileSystem();
+ // Creating a file with 4096 blockSize to write multiple blocks
+ stream = fs.create(FILE_PATH, true, BLOCK_SIZE, (short) 1, BLOCK_SIZE);
+ stream.write(DATA_BEFORE_RESTART);
+ stream.hflush();
+
+ // Wait for all of the blocks to get through
+ while (len < BLOCK_SIZE * (NUM_BLOCKS - 1)) {
+ FileStatus status = fs.getFileStatus(FILE_PATH);
+ len = status.getLen();
+ Thread.sleep(100);
+ }
+
+ // Abandon the last block
+ DFSClient dfsclient =
DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
+ LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(
+ FILE_NAME, 0, BLOCK_SIZE * NUM_BLOCKS);
+ assertEquals(NUM_BLOCKS, blocks.getLocatedBlocks().size());
+ LocatedBlock b = blocks.getLastLocatedBlock();
+ dfsclient.getNamenode().abandonBlock(b.getBlock(), FILE_NAME,
+ dfsclient.clientName);
+
+ // explicitly do NOT close the file.
+ cluster.restartNameNode();
+
+ // Check that the file has no less bytes than before the restart
+ // This would mean that blocks were successfully persisted to the log
+ FileStatus status = fs.getFileStatus(FILE_PATH);
+ assertTrue("Length incorrect: " + status.getLen(),
+ status.getLen() != len - BLOCK_SIZE);
+
+ // Verify the data showed up from before restart, sans abandoned block.
+ FSDataInputStream readStream = fs.open(FILE_PATH);
+ try {
+ byte[] verifyBuf = new byte[DATA_BEFORE_RESTART.length - BLOCK_SIZE];
+ IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
+ byte[] expectedBuf = new byte[DATA_BEFORE_RESTART.length - BLOCK_SIZE];
+ System.arraycopy(DATA_BEFORE_RESTART, 0,
+ expectedBuf, 0, expectedBuf.length);
+ assertArrayEquals(expectedBuf, verifyBuf);
+ } finally {
+ IOUtils.closeStream(readStream);
+ }
+ } finally {
+ if (cluster != null) { cluster.shutdown(); }
+ }
+ }
+
+ @Test
+ public void testRestartWithPartialBlockHflushed() throws IOException {
+ final Configuration conf = new HdfsConfiguration();
+ // Turn off persistent IPC, so that the DFSClient can survive NN restart
+ conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+ 0);
+ conf.setBoolean(DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY, true);
+ MiniDFSCluster cluster = null;
+
+ FSDataOutputStream stream;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ FileSystem fs = cluster.getFileSystem();
+ NameNode.getAddress(conf).getPort();
+ // Creating a file with 4096 blockSize to write multiple blocks
+ stream = fs.create(FILE_PATH, true, BLOCK_SIZE, (short) 1, BLOCK_SIZE);
+ stream.write(DATA_BEFORE_RESTART);
+ stream.write((byte)1);
+ stream.hflush();
+
+ // explicitly do NOT close the file before restarting the NN.
+ cluster.restartNameNode();
+
+ // this will fail if the final block of the file is prematurely COMPLETEd
+ stream.write((byte)2);
+ stream.hflush();
+ stream.close();
+
+ assertEquals(DATA_BEFORE_RESTART.length + 2,
+ fs.getFileStatus(FILE_PATH).getLen());
+
+ FSDataInputStream readStream = fs.open(FILE_PATH);
+ try {
+ byte[] verifyBuf = new byte[DATA_BEFORE_RESTART.length + 2];
+ IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
+ byte[] expectedBuf = new byte[DATA_BEFORE_RESTART.length + 2];
+ System.arraycopy(DATA_BEFORE_RESTART, 0, expectedBuf, 0,
+ DATA_BEFORE_RESTART.length);
+ System.arraycopy(new byte[]{1, 2}, 0, expectedBuf,
+ DATA_BEFORE_RESTART.length, 2);
+ assertArrayEquals(expectedBuf, verifyBuf);
+ } finally {
+ IOUtils.closeStream(readStream);
+ }
+ } finally {
+ if (cluster != null) { cluster.shutdown(); }
+ }
+ }
+
+ @Test
+ public void testRestartWithAppend() throws IOException {
+ final Configuration conf = new HdfsConfiguration();
+ // Turn off persistent IPC, so that the DFSClient can survive NN restart
+ conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+ 0);
+ conf.setBoolean(DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY, true);
+ MiniDFSCluster cluster = null;
+
+ FSDataOutputStream stream;
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ FileSystem fs = cluster.getFileSystem();
+ NameNode.getAddress(conf).getPort();
+ // Creating a file with 4096 blockSize to write multiple blocks
+ stream = fs.create(FILE_PATH, true, BLOCK_SIZE, (short) 1, BLOCK_SIZE);
+ stream.write(DATA_BEFORE_RESTART, 0, DATA_BEFORE_RESTART.length / 2);
+ stream.close();
+ stream = fs.append(FILE_PATH, BLOCK_SIZE);
+ stream.write(DATA_BEFORE_RESTART, DATA_BEFORE_RESTART.length / 2,
+ DATA_BEFORE_RESTART.length / 2);
+ stream.close();
+
+ assertEquals(DATA_BEFORE_RESTART.length,
+ fs.getFileStatus(FILE_PATH).getLen());
+
+ cluster.restartNameNode();
+
+ assertEquals(DATA_BEFORE_RESTART.length,
+ fs.getFileStatus(FILE_PATH).getLen());
+
+ FSDataInputStream readStream = fs.open(FILE_PATH);
+ try {
+ byte[] verifyBuf = new byte[DATA_BEFORE_RESTART.length];
+ IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
+ assertArrayEquals(DATA_BEFORE_RESTART, verifyBuf);
+ } finally {
+ IOUtils.closeStream(readStream);
+ }
+ } finally {
+ if (cluster != null) { cluster.shutdown(); }
+ }
+ }
+}
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java?rev=1215036&r1=1215035&r2=1215036&view=diff
==============================================================================
---
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
(original)
+++
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
Fri Dec 16 04:18:58 2011
@@ -116,10 +116,12 @@ public class TestEditLog extends TestCas
int numTransactions;
short replication = 3;
long blockSize = 64;
+ final int id;
- Transactions(FSNamesystem ns, int num) {
+ Transactions(FSNamesystem ns, int num, int id) {
namesystem = ns;
numTransactions = num;
+ this.id = id;
}
// add a bunch of transactions.
@@ -131,8 +133,9 @@ public class TestEditLog extends TestCas
for (int i = 0; i < numTransactions; i++) {
INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
p, replication, blockSize, 0, "", "", null);
- editLog.logOpenFile("/filename" + i, inode);
- editLog.logCloseFile("/filename" + i, inode);
+ String fileName = "/filename-" + id + "-" + i;
+ editLog.logOpenFile(fileName, inode);
+ editLog.logCloseFile(fileName, inode);
editLog.logSync();
}
}
@@ -280,7 +283,7 @@ public class TestEditLog extends TestCas
// Create threads and make them run transactions concurrently.
Thread threadId[] = new Thread[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
- Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS);
+ Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS, i);
threadId[i] = new Thread(trans, "TransactionThread-" + i);
threadId[i].start();
}