Author: eli
Date: Tue Dec 13 01:58:25 2011
New Revision: 1213537
URL: http://svn.apache.org/viewvc?rev=1213537&view=rev
Log:
HDFS-1765. Block Replication should respect under-replication block priority.
Contributed by Uma Maheswara Rao G
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1213537&r1=1213536&r2=1213537&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Dec 13
01:58:25 2011
@@ -233,6 +233,9 @@ Release 0.23.1 - UNRELEASED
HDFS-2590. Fix the missing links in the WebHDFS forrest doc. (szetszwo)
HDFS-2596. TestDirectoryScanner doesn't test parallel scans. (eli)
+
+ HDFS-1765. Block Replication should respect under-replication
+ block priority. (Uma Maheswara Rao G via eli)
Release 0.23.0 - 2011-11-01
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1213537&r1=1213536&r2=1213537&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
Tue Dec 13 01:58:25 2011
@@ -168,9 +168,6 @@ public class BlockManager {
/** variable to enable check for enough racks */
final boolean shouldCheckForEnoughRacks;
- /** Last block index used for replication work. */
- private int replIndex = 0;
-
/** for block replicas placement */
private BlockPlacementPolicy blockplacement;
@@ -923,74 +920,16 @@ public class BlockManager {
* @return number of blocks scheduled for replication during this iteration.
*/
private int computeReplicationWork(int blocksToProcess) throws IOException {
- // Choose the blocks to be replicated
- List<List<Block>> blocksToReplicate =
- chooseUnderReplicatedBlocks(blocksToProcess);
-
- // replicate blocks
- return computeReplicationWorkForBlocks(blocksToReplicate);
- }
-
- /**
- * Get a list of block lists to be replicated The index of block lists
- * represents the
- *
- * @param blocksToProcess
- * @return Return a list of block lists to be replicated. The block list
index
- * represents its replication priority.
- */
- private List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
- // initialize data structure for the return value
- List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(
- UnderReplicatedBlocks.LEVEL);
- for (int i = 0; i < UnderReplicatedBlocks.LEVEL; i++) {
- blocksToReplicate.add(new ArrayList<Block>());
- }
+ List<List<Block>> blocksToReplicate = null;
namesystem.writeLock();
try {
- synchronized (neededReplications) {
- if (neededReplications.size() == 0) {
- return blocksToReplicate;
- }
-
- // Go through all blocks that need replications.
- UnderReplicatedBlocks.BlockIterator neededReplicationsIterator =
- neededReplications.iterator();
- // skip to the first unprocessed block, which is at replIndex
- for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext();
i++) {
- neededReplicationsIterator.next();
- }
- // # of blocks to process equals either twice the number of live
- // data-nodes or the number of under-replicated blocks whichever is
less
- blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
-
- for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
- if (!neededReplicationsIterator.hasNext()) {
- // start from the beginning
- replIndex = 0;
- blocksToProcess = Math.min(blocksToProcess, neededReplications
- .size());
- if (blkCnt >= blocksToProcess)
- break;
- neededReplicationsIterator = neededReplications.iterator();
- assert neededReplicationsIterator.hasNext() : "neededReplications
should not be empty.";
- }
-
- Block block = neededReplicationsIterator.next();
- int priority = neededReplicationsIterator.getPriority();
- if (priority < 0 || priority >= blocksToReplicate.size()) {
- LOG.warn("Unexpected replication priority: "
- + priority + " " + block);
- } else {
- blocksToReplicate.get(priority).add(block);
- }
- } // end for
- } // end synchronized neededReplication
+ // Choose the blocks to be replicated
+ blocksToReplicate = neededReplications
+ .chooseUnderReplicatedBlocks(blocksToProcess);
} finally {
namesystem.writeUnlock();
}
-
- return blocksToReplicate;
+ return computeReplicationWorkForBlocks(blocksToReplicate);
}
/** Replicate a set of blocks
@@ -1019,7 +958,7 @@ public class BlockManager {
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from
neededReplications
- replIndex--;
+ neededReplications.decrementReplicationIndex(priority);
continue;
}
@@ -1043,7 +982,7 @@ public class BlockManager {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from
neededReplications
- replIndex--;
+ neededReplications.decrementReplicationIndex(priority);
NameNode.stateChangeLog.info("BLOCK* "
+ "Removing block " + block
+ " from neededReplications as it has enough replicas.");
@@ -1104,7 +1043,7 @@ public class BlockManager {
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from
neededReplications
rw.targets = null;
- replIndex--;
+ neededReplications.decrementReplicationIndex(priority);
continue;
}
requiredReplication = fileINode.getReplication();
@@ -1118,7 +1057,7 @@ public class BlockManager {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from
neededReplications
- replIndex--;
+ neededReplications.decrementReplicationIndex(priority);
rw.targets = null;
NameNode.stateChangeLog.info("BLOCK* "
+ "Removing block " + block
@@ -1156,7 +1095,7 @@ public class BlockManager {
// remove from neededReplications
if(numEffectiveReplicas + targets.length >= requiredReplication) {
neededReplications.remove(block, priority); // remove from
neededReplications
- replIndex--;
+ neededReplications.decrementReplicationIndex(priority);
}
}
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java?rev=1213537&r1=1213536&r2=1213537&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
Tue Dec 13 01:58:25 2011
@@ -18,8 +18,11 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@@ -81,10 +84,14 @@ class UnderReplicatedBlocks implements I
private List<LightWeightLinkedSet<Block>> priorityQueues
= new ArrayList<LightWeightLinkedSet<Block>>();
+ /** Stores the replication index for each priority */
+ private Map<Integer, Integer> priorityToReplIdx = new HashMap<Integer,
Integer>(LEVEL);
+
/** Create an object. */
UnderReplicatedBlocks() {
for (int i = 0; i < LEVEL; i++) {
priorityQueues.add(new LightWeightLinkedSet<Block>());
+ priorityToReplIdx.put(i, 0);
}
}
@@ -300,6 +307,70 @@ class UnderReplicatedBlocks implements I
}
}
}
+
+ /**
+ * Get a list of block lists to be replicated. The index of block lists
+ * represents its replication priority. Replication index will be tracked for
+ * each priority list separately in priorityToReplIdx map. Iterates through
+ * all priority lists and find the elements after replication index. Once the
+ * last priority lists reaches to end, all replication indexes will be set to
+ * 0 and start from 1st priority list to fulfill the blockToProces count.
+ *
+ * @param blocksToProcess - number of blocks to fetch from underReplicated
blocks.
+ * @return Return a list of block lists to be replicated. The block list
index
+ * represents its replication priority.
+ */
+ public synchronized List<List<Block>> chooseUnderReplicatedBlocks(
+ int blocksToProcess) {
+ // initialize data structure for the return value
+ List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL);
+ for (int i = 0; i < LEVEL; i++) {
+ blocksToReplicate.add(new ArrayList<Block>());
+ }
+
+ if (size() == 0) { // There are no blocks to collect.
+ return blocksToReplicate;
+ }
+
+ int blockCount = 0;
+ for (int priority = 0; priority < LEVEL; priority++) {
+ // Go through all blocks that need replications with current priority.
+ BlockIterator neededReplicationsIterator = iterator(priority);
+ Integer replIndex = priorityToReplIdx.get(priority);
+
+ // skip to the first unprocessed block, which is at replIndex
+ for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext();
i++) {
+ neededReplicationsIterator.next();
+ }
+
+ blocksToProcess = Math.min(blocksToProcess, size());
+
+ if (blockCount == blocksToProcess) {
+ break; // break if already expected blocks are obtained
+ }
+
+ // Loop through all remaining blocks in the list.
+ while (blockCount < blocksToProcess
+ && neededReplicationsIterator.hasNext()) {
+ Block block = neededReplicationsIterator.next();
+ blocksToReplicate.get(priority).add(block);
+ replIndex++;
+ blockCount++;
+ }
+
+ if (!neededReplicationsIterator.hasNext()
+ && neededReplicationsIterator.getPriority() == LEVEL - 1) {
+ // reset all priorities replication index to 0 because there is no
+ // recently added blocks in any list.
+ for (int i = 0; i < LEVEL; i++) {
+ priorityToReplIdx.put(i, 0);
+ }
+ break;
+ }
+ priorityToReplIdx.put(priority, replIndex);
+ }
+ return blocksToReplicate;
+ }
/** returns an iterator of all blocks in a given priority queue */
synchronized BlockIterator iterator(int level) {
@@ -380,4 +451,14 @@ class UnderReplicatedBlocks implements I
return level;
}
}
+
+ /**
+ * This method is to decrement the replication index for the given priority
+ *
+ * @param priority - int priority level
+ */
+ public void decrementReplicationIndex(int priority) {
+ Integer replIdx = priorityToReplIdx.get(priority);
+ priorityToReplIdx.put(priority, --replIdx);
+ }
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1213537&r1=1213536&r2=1213537&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
Tue Dec 13 01:58:25 2011
@@ -17,26 +17,32 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import static org.junit.Assert.*;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-
-import junit.framework.TestCase;
+import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
+import org.junit.Test;
-public class TestReplicationPolicy extends TestCase {
+public class TestReplicationPolicy {
+ private Random random= DFSUtil.getRandom();
private static final int BLOCK_SIZE = 1024;
private static final int NUM_OF_DATANODES = 6;
private static final Configuration CONF = new HdfsConfiguration();
@@ -90,6 +96,7 @@ public class TestReplicationPolicy exten
* the 1st is on dataNodes[0] and the 2nd is on a different rack.
* @throws Exception
*/
+ @Test
public void testChooseTarget1() throws Exception {
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
@@ -150,6 +157,7 @@ public class TestReplicationPolicy exten
* should be placed on a third rack.
* @throws Exception
*/
+ @Test
public void testChooseTarget2() throws Exception {
HashMap<Node, Node> excludedNodes;
DatanodeDescriptor[] targets;
@@ -225,6 +233,7 @@ public class TestReplicationPolicy exten
* and the rest should be placed on the third rack.
* @throws Exception
*/
+ @Test
public void testChooseTarget3() throws Exception {
// make data node 0 to be not qualified to choose
dataNodes[0].updateHeartbeat(
@@ -278,6 +287,7 @@ public class TestReplicationPolicy exten
* the 3rd replica should be placed on the same rack as the 1st replica,
* @throws Exception
*/
+ @Test
public void testChoooseTarget4() throws Exception {
// make data node 0 & 1 to be not qualified to choose: not enough disk
space
for(int i=0; i<2; i++) {
@@ -325,6 +335,7 @@ public class TestReplicationPolicy exten
* the 3rd replica should be placed on the same rack as the 2nd replica,
* @throws Exception
*/
+ @Test
public void testChooseTarget5() throws Exception {
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
@@ -354,6 +365,7 @@ public class TestReplicationPolicy exten
* the 1st replica. The 3rd replica can be placed randomly.
* @throws Exception
*/
+ @Test
public void testRereplicate1() throws Exception {
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
@@ -388,6 +400,7 @@ public class TestReplicationPolicy exten
* the rest replicas can be placed randomly,
* @throws Exception
*/
+ @Test
public void testRereplicate2() throws Exception {
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
@@ -417,6 +430,7 @@ public class TestReplicationPolicy exten
* the rest replicas can be placed randomly,
* @throws Exception
*/
+ @Test
public void testRereplicate3() throws Exception {
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
@@ -450,4 +464,122 @@ public class TestReplicationPolicy exten
assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
}
+ /**
+ * Test for the high priority blocks are processed before the low priority
+ * blocks.
+ */
+ @Test(timeout = 60000)
+ public void testReplicationWithPriority() throws Exception {
+ int DFS_NAMENODE_REPLICATION_INTERVAL = 1000;
+ int HIGH_PRIORITY = 0;
+ Configuration conf = new Configuration();
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .format(true).build();
+ try {
+ cluster.waitActive();
+ final UnderReplicatedBlocks neededReplications = (UnderReplicatedBlocks)
cluster
+ .getNameNode().getNamesystem().getBlockManager().neededReplications;
+ for (int i = 0; i < 100; i++) {
+ // Adding the blocks directly to normal priority
+ neededReplications.add(new Block(random.nextLong()), 2, 0, 3);
+ }
+ // Lets wait for the replication interval, to start process normal
+ // priority blocks
+ Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
+
+ // Adding the block directly to high priority list
+ neededReplications.add(new Block(random.nextLong()), 1, 0, 3);
+
+ // Lets wait for the replication interval
+ Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
+
+ // Check replication completed successfully. Need not wait till it
process
+ // all the 100 normal blocks.
+ assertFalse("Not able to clear the element from high priority list",
+ neededReplications.iterator(HIGH_PRIORITY).hasNext());
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Test for the ChooseUnderReplicatedBlocks are processed based on priority
+ */
+ @Test
+ public void testChooseUnderReplicatedBlocks() throws Exception {
+ UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
+
+ for (int i = 0; i < 5; i++) {
+ // Adding QUEUE_HIGHEST_PRIORITY block
+ underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3);
+
+ // Adding QUEUE_VERY_UNDER_REPLICATED block
+ underReplicatedBlocks.add(new Block(random.nextLong()), 2, 0, 7);
+
+ // Adding QUEUE_UNDER_REPLICATED block
+ underReplicatedBlocks.add(new Block(random.nextLong()), 6, 0, 6);
+
+ // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
+ underReplicatedBlocks.add(new Block(random.nextLong()), 5, 0, 6);
+
+ // Adding QUEUE_WITH_CORRUPT_BLOCKS block
+ underReplicatedBlocks.add(new Block(random.nextLong()), 0, 0, 3);
+ }
+
+ // Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
+ // from
+ // QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_UNDER_REPLICATED.
+ List<List<Block>> chosenBlocks =
underReplicatedBlocks.chooseUnderReplicatedBlocks(6);
+ assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0);
+
+ // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 4
blocks from
+ // QUEUE_VERY_UNDER_REPLICATED, 5 blocks from QUEUE_UNDER_REPLICATED and 1
+ // block from QUEUE_REPLICAS_BADLY_DISTRIBUTED.
+ chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
+ assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0);
+
+ // Adding QUEUE_HIGHEST_PRIORITY
+ underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3);
+
+ // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1
block from
+ // QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
+ // and 5 blocks from QUEUE_WITH_CORRUPT_BLOCKS.
+ chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
+ assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4, 5);
+
+ // Since it is reached to end of all lists,
+ // should start picking the blocks from start.
+ // Choose 7 blocks from UnderReplicatedBlocks. Then it should pick 6
blocks from
+ // QUEUE_HIGHEST_PRIORITY, 1 block from QUEUE_VERY_UNDER_REPLICATED.
+ chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(7);
+ assertTheChosenBlocks(chosenBlocks, 6, 1, 0, 0, 0);
+ }
+
+ /** asserts the chosen blocks with expected priority blocks */
+ private void assertTheChosenBlocks(
+ List<List<Block>> chosenBlocks, int firstPrioritySize,
+ int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize,
+ int fifthPrioritySize) {
+ assertEquals(
+ "Not returned the expected number of QUEUE_HIGHEST_PRIORITY blocks",
+ firstPrioritySize, chosenBlocks.get(
+ UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).size());
+ assertEquals(
+ "Not returned the expected number of QUEUE_VERY_UNDER_REPLICATED
blocks",
+ secondPrioritySize, chosenBlocks.get(
+ UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).size());
+ assertEquals(
+ "Not returned the expected number of QUEUE_UNDER_REPLICATED blocks",
+ thirdPrioritySize, chosenBlocks.get(
+ UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).size());
+ assertEquals(
+ "Not returned the expected number of QUEUE_REPLICAS_BADLY_DISTRIBUTED
blocks",
+ fourthPrioritySize, chosenBlocks.get(
+ UnderReplicatedBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED).size());
+ assertEquals(
+ "Not returned the expected number of QUEUE_WITH_CORRUPT_BLOCKS blocks",
+ fifthPrioritySize, chosenBlocks.get(
+ UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size());
+ }
}
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
URL:
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java?rev=1213537&r1=1213536&r2=1213537&view=diff
==============================================================================
---
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
(original)
+++
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
Tue Dec 13 01:58:25 2011
@@ -145,9 +145,7 @@ public class TestNameNodeMetrics extends
fs.delete(file, true);
filesTotal--; // reduce the filecount for deleted file
- // Wait for more than DATANODE_COUNT replication intervals to ensure all
- // the blocks pending deletion are sent for deletion to the datanodes.
- Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
+ waitForDeletion();
updateMetrics();
rb = getMetrics(NS_METRICS);
assertGauge("FilesTotal", filesTotal, rb);
@@ -176,7 +174,7 @@ public class TestNameNodeMetrics extends
assertGauge("PendingReplicationBlocks", 1L, rb);
assertGauge("ScheduledReplicationBlocks", 1L, rb);
fs.delete(file, true);
- updateMetrics();
+ waitForDeletion();
rb = getMetrics(NS_METRICS);
assertGauge("CorruptBlocks", 0L, rb);
assertGauge("PendingReplicationBlocks", 0L, rb);
@@ -212,9 +210,15 @@ public class TestNameNodeMetrics extends
assertGauge("UnderReplicatedBlocks", 1L, rb);
assertGauge("MissingBlocks", 1L, rb);
fs.delete(file, true);
- updateMetrics();
+ waitForDeletion();
assertGauge("UnderReplicatedBlocks", 0L, getMetrics(NS_METRICS));
}
+
+ private void waitForDeletion() throws InterruptedException {
+ // Wait for more than DATANODE_COUNT replication intervals to ensure all
+ // the blocks pending deletion are sent for deletion to the datanodes.
+ Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
+ }
public void testRenameMetrics() throws Exception {
Path src = getTestPath("src");