Author: shv
Date: Mon Jul 7 16:42:30 2008
New Revision: 674675
URL: http://svn.apache.org/viewvc?rev=674675&view=rev
Log:
Revert changes for revision 674657-674658 related to HADOOP-3002.
Modified:
hadoop/core/branches/branch-0.18/CHANGES.txt
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeDescriptor.java
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/NameNode.java
Modified: hadoop/core/branches/branch-0.18/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/CHANGES.txt?rev=674675&r1=674674&r2=674675&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.18/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.18/CHANGES.txt Mon Jul 7 16:42:30 2008
@@ -755,8 +755,6 @@
HADOOP-3633. Correct exception handling in DataXceiveServer, and throttle
the number of xceiver threads in a data-node. (shv)
- HADOOP-3002. HDFS should not remove blocks while in safemode. (shv)
-
Release 0.17.0 - 2008-05-18
INCOMPATIBLE CHANGES
Modified:
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeDescriptor.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeDescriptor.java?rev=674675&r1=674674&r2=674675&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeDescriptor.java
(original)
+++
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/DatanodeDescriptor.java
Mon Jul 7 16:42:30 2008
@@ -344,8 +344,7 @@
void reportDiff(BlocksMap blocksMap,
BlockListAsLongs newReport,
Collection<Block> toAdd,
- Collection<Block> toRemove,
- Collection<Block> toInvalidate) {
+ Collection<Block> toRemove) {
// place a deilimiter in the list which separates blocks
// that have been reported from those that have not
BlockInfo delimiter = new BlockInfo(new Block(), 1);
@@ -361,9 +360,8 @@
iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i),
newReport.getBlockGenStamp(i));
BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
- if(storedBlock == null) {
- // If block is not in blocksMap it does not belong to any file
- toInvalidate.add(new Block(iblk));
+ if(storedBlock == null) { // Brand new block
+ toAdd.add(new Block(iblk));
continue;
}
if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN
Modified:
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java?rev=674675&r1=674674&r2=674675&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
(original)
+++
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/FSNamesystem.java
Mon Jul 7 16:42:30 2008
@@ -2085,36 +2085,37 @@
nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
updateStats(nodeinfo, true);
- // If the datanode has (just) been resolved and we haven't ever
processed
- // a block report from it yet, ask for one now.
- if (!blockReportProcessed(nodeReg)) {
- // If we never processed a block report from this datanode, we
shouldn't
- // have any work for that as well
- assert(cmd == null);
- if (isResolved(nodeReg)) {
- return DatanodeCommand.BLOCKREPORT;
- }
- }
-
- if (isInSafeMode()) {
- //check distributed upgrade
- return getDistributedUpgradeCommand();
- }
-
- // All other commands are not allowed in safe mode.
//check lease recovery
- cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
- if (cmd != null)
- return cmd;
+ if (cmd == null) {
+ cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+ }
//check pending replication
- cmd = nodeinfo.getReplicationCommand(
- maxReplicationStreams - xmitsInProgress);
- if (cmd != null)
- return cmd;
+ if (cmd == null) {
+ cmd = nodeinfo.getReplicationCommand(
+ maxReplicationStreams - xmitsInProgress);
+ }
//check block invalidation
- return nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+ if (cmd == null) {
+ cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+ }
}
}
+
+ // If the datanode has (just) been resolved and we haven't ever processed
+ // a block report from it yet, ask for one now.
+ if (!blockReportProcessed(nodeReg)) {
+ // If we never processed a block report from this datanode, we shouldn't
+ // have any work for that as well
+ assert(cmd == null);
+ if (isResolved(nodeReg)) {
+ return DatanodeCommand.BLOCKREPORT;
+ }
+ }
+ //check distributed upgrade
+ if (cmd == null) {
+ cmd = getDistributedUpgradeCommand();
+ }
+ return cmd;
}
private void updateStats(DatanodeDescriptor node, boolean isAdded) {
@@ -2197,9 +2198,6 @@
int workFound = 0;
int blocksToProcess = 0;
int nodesToProcess = 0;
- // blocks should not be replicated or removed if safe mode is on
- if (isInSafeMode())
- return workFound;
synchronized(heartbeats) {
blocksToProcess = (int)(heartbeats.size()
* ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
@@ -2242,6 +2240,9 @@
private synchronized int computeReplicationWork(
int blocksToProcess) throws IOException {
int scheduledReplicationCount = 0;
+ // blocks should not be replicated or removed if safe mode is on
+ if (isInSafeMode())
+ return scheduledReplicationCount;
synchronized(neededReplications) {
// # of blocks to process equals either twice the number of live
@@ -2617,9 +2618,9 @@
* The given node is reporting all its blocks. Use this info to
* update the (machine-->blocklist) and (block-->machinelist) tables.
*/
- public synchronized void processReport(DatanodeID nodeID,
- BlockListAsLongs newReport
- ) throws IOException {
+ public synchronized Block[] processReport(DatanodeID nodeID,
+ BlockListAsLongs newReport
+ ) throws IOException {
long startTime = now();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
@@ -2641,7 +2642,7 @@
if (node.getNetworkLocation().equals(NetworkTopology.UNRESOLVED)) {
LOG.info("Ignoring block report from " + nodeID.getName() +
" because rack location for this datanode is still to be
resolved.");
- return; //drop the block report if the dn hasn't been resolved
+ return null; //drop the block report if the dn hasn't been resolved
}
node.setBlockReportProcessed(true);
@@ -2651,8 +2652,7 @@
//
Collection<Block> toAdd = new LinkedList<Block>();
Collection<Block> toRemove = new LinkedList<Block>();
- Collection<Block> toInvalidate = new LinkedList<Block>();
- node.reportDiff(blocksMap, newReport, toAdd, toRemove, toInvalidate);
+ node.reportDiff(blocksMap, newReport, toAdd, toRemove);
for (Block b : toRemove) {
removeStoredBlock(b, node);
@@ -2660,13 +2660,41 @@
for (Block b : toAdd) {
addStoredBlock(b, node, null);
}
- for (Block b : toInvalidate) {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block "
- + b + " on " + node.getName() + " size " + b.getNumBytes()
- + " does not belong to any file.");
- addToInvalidates(b, node);
+
+ //
+ // We've now completely updated the node's block report profile.
+ // We now go through all its blocks and find which ones are invalid,
+ // no longer pending, or over-replicated.
+ //
+ // (Note it's not enough to just invalidate blocks at lease expiry
+ // time; datanodes can go down before the client's lease on
+ // the failed file expires and miss the "expire" event.)
+ //
+ // This function considers every block on a datanode, and thus
+ // should only be invoked infrequently.
+ //
+ Collection<Block> obsolete = new ArrayList<Block>();
+ for (Iterator<Block> it = node.getBlockIterator(); it.hasNext();) {
+ Block b = it.next();
+
+ //
+ // A block report can only send BLOCK_INVALIDATE_CHUNK number of
+ // blocks to be deleted. If there are more blocks to be deleted,
+ // they are added to recentInvalidateSets and will be sent out
+ // thorugh succeeding heartbeat responses.
+ //
+ if (!isValidBlock(b)) {
+ if (obsolete.size() > blockInvalidateLimit) {
+ addToInvalidates(b, node);
+ } else {
+ obsolete.add(b);
+ }
+ NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
+ +"ask "+nodeID.getName()+" to delete
"+b);
+ }
}
NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
+ return obsolete.toArray(new Block[obsolete.size()]);
}
/**
@@ -2678,6 +2706,7 @@
DatanodeDescriptor node,
DatanodeDescriptor delNodeHint) {
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+ INodeFile fileINode = null;
boolean added = false;
if(storedBlock == null) { // block is not in the blocksMaps
// add block to the blocksMap and to the data-node
@@ -2689,6 +2718,7 @@
}
assert storedBlock != null : "Block must be stored by now";
+ fileINode = storedBlock.getINode();
if (block != storedBlock) {
if (block.getNumBytes() > 0) {
long cursize = storedBlock.getNumBytes();
@@ -2763,8 +2793,17 @@
+ block + " on " + node.getName()
+ " size " + block.getNumBytes());
}
-
- assert isValidBlock(storedBlock) : "Trying to add an invalid block";
+ //
+ // If this block does not belong to anyfile, then we are done.
+ //
+ if (fileINode == null) {
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
+ + "addStoredBlock request received for "
+ + block + " on " + node.getName()
+ + " size " + block.getNumBytes()
+ + " But it does not belong to any file.");
+ return block;
+ }
// filter out containingNodes that are marked for decommission.
NumberReplicas num = countNodes(storedBlock);
@@ -2779,8 +2818,6 @@
// if file is being actively written to, then do not check
// replication-factor here. It will be checked when the file is closed.
//
- INodeFile fileINode = null;
- fileINode = storedBlock.getINode();
if (fileINode.isUnderConstruction()) {
return block;
}
Modified:
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/NameNode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/NameNode.java?rev=674675&r1=674674&r2=674675&view=diff
==============================================================================
---
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/NameNode.java
(original)
+++
hadoop/core/branches/branch-0.18/src/hdfs/org/apache/hadoop/dfs/NameNode.java
Mon Jul 7 16:42:30 2008
@@ -610,7 +610,9 @@
stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
+"from "+nodeReg.getName()+" "+blist.getNumberOfBlocks() +"
blocks");
- namesystem.processReport(nodeReg, blist);
+ Block blocksToDelete[] = namesystem.processReport(nodeReg, blist);
+ if (blocksToDelete != null && blocksToDelete.length > 0)
+ return new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blocksToDelete);
if (getFSImage().isUpgradeFinalized())
return DatanodeCommand.FINALIZE;
return null;