Author: shv
Date: Sun May 11 09:48:20 2008
New Revision: 655337
URL: http://svn.apache.org/viewvc?rev=655337&view=rev
Log:
HADOOP-3369. Fast block processing during name-node startup. Contributed by
Konstantin Shvachko.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=655337&r1=655336&r2=655337&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sun May 11 09:48:20 2008
@@ -133,6 +133,8 @@
HADOOP-3364. Faster image and log edits loading. (shv)
+ HADOOP-3369. Fast block processing during name-node startup. (shv)
+
BUG FIXES
HADOOP-2905. 'fsck -move' triggers NPE in NameNode.
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java?rev=655337&r1=655336&r2=655337&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java Sun May 11
09:48:20 2008
@@ -379,4 +379,8 @@
int size() {
return map.size();
}
+
+ Collection<BlockInfo> getBlocks() {
+ return map.values();
+ }
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=655337&r1=655336&r2=655337&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Sun May
11 09:48:20 2008
@@ -1216,6 +1216,20 @@
recentInvalidateSets.put(n.getStorageID(), invalidateSet);
}
invalidateSet.add(b);
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.delete: "
+ + b.getBlockName() + " is added to invalidSet of " + n.getName());
+ }
+
+ /**
+ * Adds block to list of blocks which will be invalidated on
+ * all its datanodes.
+ */
+ private void addToInvalidates(Block b) {
+ for (Iterator<DatanodeDescriptor> it =
+ blocksMap.nodeIterator(b); it.hasNext();) {
+ DatanodeDescriptor node = it.next();
+ addToInvalidates(b, node);
+ }
}
/**
@@ -1373,14 +1387,7 @@
return false;
}
for (Block b : deletedBlocks) {
- for (Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(b); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- addToInvalidates(b, node);
- NameNode.stateChangeLog.info("BLOCK* NameSystem.delete: "
- + b.getBlockName() + " is added to
invalidSet of "
- + node.getName());
- }
+ addToInvalidates(b);
}
if (old.isUnderConstruction()) {
INodeFileUnderConstruction cons = (INodeFileUnderConstruction) old;
@@ -2574,7 +2581,11 @@
if (fileINode.isUnderConstruction()) {
return block;
}
-
+
+ // do not handle mis-replicated blocks during startup
+ if(isInSafeMode())
+ return block;
+
// handle underReplication/overReplication
short fileReplication = fileINode.getReplication();
if (numCurrentReplica >= fileReplication) {
@@ -2588,7 +2599,47 @@
}
return block;
}
-
+
+ /**
+ * For each block in the name-node verify whether it belongs to any file,
+ * over or under replicated. Place it into the respective queue.
+ */
+ private synchronized void processMisReplicatedBlocks() {
+ long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
+ neededReplications.clear();
+ excessReplicateMap.clear();
+ for(BlocksMap.BlockInfo block : blocksMap.getBlocks()) {
+ INodeFile fileINode = block.getINode();
+ if(fileINode == null) {
+ // block does not belong to any file
+ nrInvalid++;
+ addToInvalidates(block);
+ continue;
+ }
+ // calculate current replication
+ short expectedReplication = fileINode.getReplication();
+ NumberReplicas num = countNodes(block);
+ int numCurrentReplica = num.liveReplicas();
+ // add to under-replicated queue if need to be
+ if (neededReplications.add(block,
+ numCurrentReplica,
+ num.decommissionedReplicas(),
+ expectedReplication)) {
+ nrUnderReplicated++;
+ }
+
+ if (numCurrentReplica > expectedReplication) {
+ // over-replicated block
+ nrOverReplicated++;
+ proccessOverReplicatedBlock(block, expectedReplication, null, null);
+ }
+ }
+ LOG.info("Total number of blocks = " + blocksMap.size());
+ LOG.info("Number of invalid blocks = " + nrInvalid);
+ LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
+ LOG.info("Number of over-replicated blocks = " + nrOverReplicated);
+ }
+
/**
* Find how many of the containing nodes are "extra", if any.
* If there are any extras, call chooseExcessReplicates() to
@@ -3335,8 +3386,8 @@
* of blocks in the system, which is the size of
* [EMAIL PROTECTED] FSNamesystem#blocksMap}. When the ratio reaches the
* [EMAIL PROTECTED] #threshold} it starts the [EMAIL PROTECTED]
SafeModeMonitor} daemon in order
- * to monitor whether the safe mode extension is passed. Then it leaves safe
- * mode and destroys itself.
+ * to monitor whether the safe mode [EMAIL PROTECTED] #extension} is passed.
+ * Then it leaves safe mode and destroys itself.
* <p>
* If safe mode is turned on manually then the number of safe blocks is
* not tracked because the name node is not intended to leave safe mode
@@ -3425,9 +3476,12 @@
/**
* Leave safe mode.
- * Switch to manual safe mode if distributed upgrade is required.
+ * <p>
+ * Switch to manual safe mode if distributed upgrade is required.<br>
+ * Check for invalid, under- & over-replicated blocks in the end of
startup.
*/
- synchronized void leave(boolean checkForUpgrades) {
+ synchronized void leave(boolean checkForUpgrades,
+ boolean checkBlockReplication) {
if(checkForUpgrades) {
// verify whether a distributed upgrade needs to be started
boolean needUpgrade = false;
@@ -3442,6 +3496,9 @@
return;
}
}
+ if(checkBlockReplication)
+ // verify blocks replications
+ processMisReplicatedBlocks();
long timeInSafemode = now() - systemStart;
NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
+ timeInSafemode/1000 + " secs.");
@@ -3503,7 +3560,7 @@
// the threshold is reached
if (!isOn() || // safe mode is off
extension <= 0 || threshold <= 0) { // don't need to wait
- this.leave(true); // leave safe mode
+ this.leave(true, false); // leave safe mode
return;
}
if (reached > 0) { // threshold has already been reached before
@@ -3639,8 +3696,9 @@
} catch (InterruptedException ie) {
}
}
- // leave safe mode an stop the monitor
- safeMode.leave(true);
+ // leave safe mode and stop the monitor
+ if(safeMode != null)
+ safeMode.leave(true, true);
smmthread = null;
}
}
@@ -3658,7 +3716,7 @@
checkSuperuserPrivilege();
switch(action) {
case SAFEMODE_LEAVE: // leave safe mode
- leaveSafeMode(false);
+ leaveSafeMode(false, false);
break;
case SAFEMODE_ENTER: // enter safe mode
enterSafeMode();
@@ -3729,7 +3787,9 @@
* Leave safe mode.
* @throws IOException
*/
- synchronized void leaveSafeMode(boolean checkForUpgrades) throws IOException
{
+ synchronized void leaveSafeMode(boolean checkForUpgrades,
+ boolean checkBlockReplication
+ ) throws IOException {
if (!isInSafeMode()) {
NameNode.stateChangeLog.info("STATE* Safe mode is already OFF.");
return;
@@ -3737,7 +3797,7 @@
if(getDistributedUpgradeState())
throw new SafeModeException("Distributed upgrade is in progress",
safeMode);
- safeMode.leave(checkForUpgrades);
+ safeMode.leave(checkForUpgrades, checkBlockReplication);
}
String getSafeModeTip() {
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java?rev=655337&r1=655336&r2=655337&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java
Sun May 11 09:48:20 2008
@@ -33,7 +33,16 @@
priorityQueues.add(new TreeSet<Block>());
}
}
-
+
+ /**
+ * Empty the queues.
+ */
+ void clear() {
+ for(int i=0; i<LEVEL; i++) {
+ priorityQueues.get(i).clear();
+ }
+ }
+
/* Return the total number of under replication blocks */
synchronized int size() {
int size = 0;
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java?rev=655337&r1=655336&r2=655337&view=diff
==============================================================================
---
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java
(original)
+++
hadoop/core/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java
Sun May 11 09:48:20 2008
@@ -101,7 +101,7 @@
FSNamesystem.getFSNamesystem().getFSImage().writeAll();
currentUpgrades = null;
broadcastCommand = null;
- FSNamesystem.getFSNamesystem().leaveSafeMode(false);
+ FSNamesystem.getFSNamesystem().leaveSafeMode(false, true);
}
UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action