http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 038b6ce..5075c05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -110,10 +110,13 @@ import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder; +import org.apache.hadoop.hdfs.protocol.ECBlockGroupsStats; +import org.apache.hadoop.hdfs.protocol.BlocksStats; import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -1651,6 +1654,50 @@ public class DFSTestUtil { } /** + * Verify the aggregated {@link ClientProtocol#getStats()} block counts equal + * the sum of {@link ClientProtocol#getBlocksStats()} and + * {@link ClientProtocol#getECBlockGroupsStats()}. + * @throws Exception + */ + public static void verifyClientStats(Configuration conf, + MiniDFSCluster cluster) throws Exception { + ClientProtocol client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), + ClientProtocol.class).getProxy(); + long[] aggregatedStats = cluster.getNameNode().getRpcServer().getStats(); + BlocksStats blocksStats = + client.getBlocksStats(); + ECBlockGroupsStats ecBlockGroupsStats = client.getECBlockGroupsStats(); + + assertEquals("Under replicated stats not matching!", + aggregatedStats[ClientProtocol.GET_STATS_LOW_REDUNDANCY_IDX], + aggregatedStats[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX]); + assertEquals("Low redundancy stats not matching!", + aggregatedStats[ClientProtocol.GET_STATS_LOW_REDUNDANCY_IDX], + blocksStats.getLowRedundancyBlocksStat() + + ecBlockGroupsStats.getLowRedundancyBlockGroupsStat()); + assertEquals("Corrupt blocks stats not matching!", + aggregatedStats[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX], + blocksStats.getCorruptBlocksStat() + + ecBlockGroupsStats.getCorruptBlockGroupsStat()); + assertEquals("Missing blocks stats not matching!", + aggregatedStats[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX], + blocksStats.getMissingReplicaBlocksStat() + + ecBlockGroupsStats.getMissingBlockGroupsStat()); + assertEquals("Missing blocks with replication factor one not matching!", + aggregatedStats[ClientProtocol.GET_STATS_MISSING_REPL_ONE_BLOCKS_IDX], + blocksStats.getMissingReplicationOneBlocksStat()); + assertEquals("Bytes in future blocks stats not matching!", + aggregatedStats[ClientProtocol.GET_STATS_BYTES_IN_FUTURE_BLOCKS_IDX], + blocksStats.getBytesInFutureBlocksStat() + + ecBlockGroupsStats.getBytesInFutureBlockGroupsStat()); + assertEquals("Pending deletion blocks stats not matching!", + aggregatedStats[ClientProtocol.GET_STATS_PENDING_DELETION_BLOCKS_IDX], + blocksStats.getPendingDeletionBlocksStat() + + ecBlockGroupsStats.getPendingDeletionBlockGroupsStat()); + } + + /** * Helper function to create a key in the Key Provider. Defaults * to the first indexed NameNode's Key Provider. *
http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java index 1d9d402..3e9d812 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java @@ -281,7 +281,7 @@ public class TestFileCorruption { @Override public Boolean get() { try { return cluster.getNamesystem().getBlockManager() - .getUnderReplicatedBlocksCount() == 1; + .getLowRedundancyBlocksCount() == 1; } catch (Exception e) { e.printStackTrace(); return false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java index e0dfb4a..7c536e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java @@ -549,7 +549,7 @@ public class TestMaintenanceState extends AdminStatesBaseTest { FileSystem fileSys = getCluster().getFileSystem(0); FSNamesystem ns = getCluster().getNamesystem(0); - writeFile(fileSys, file, replicas, 1); + writeFile(fileSys, file, replicas, 25); DatanodeInfo nodeOutofService = takeNodeOutofService(0, getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null, http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java index e225141..ca2fe92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMissingBlocksAlert.java @@ -98,7 +98,7 @@ public class TestMissingBlocksAlert { Thread.sleep(100); } assertTrue(dfs.getMissingBlocksCount() == 1); - assertEquals(4, dfs.getUnderReplicatedBlocksCount()); + assertEquals(4, dfs.getLowRedundancyBlocksCount()); assertEquals(3, bm.getUnderReplicatedNotMissingBlocks()); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); @@ -117,7 +117,7 @@ public class TestMissingBlocksAlert { Thread.sleep(100); } - assertEquals(2, dfs.getUnderReplicatedBlocksCount()); + assertEquals(2, dfs.getLowRedundancyBlocksCount()); assertEquals(2, bm.getUnderReplicatedNotMissingBlocks()); Assert.assertEquals(0, (long)(Long) mbs.getAttribute(mxbeanName, http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java index 61e69f3..2413918 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java @@ -17,18 +17,24 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import java.util.Random; import java.util.UUID; import static org.junit.Assert.assertEquals; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -53,10 +59,19 @@ public class TestComputeInvalidateWork { private FSNamesystem namesystem; private BlockManager bm; private DatanodeDescriptor[] nodes; + private ErasureCodingPolicy ecPolicy; + private DistributedFileSystem fs; + private Path ecFile; + private int totalBlockGroups, blockGroupSize, stripesPerBlock, cellSize; + private LocatedStripedBlock locatedStripedBlock; @Before public void setup() throws Exception { + ecPolicy = SystemErasureCodingPolicies.getByID( + SystemErasureCodingPolicies.XOR_2_1_POLICY_ID); conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, + ecPolicy.getName()); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES) .build(); cluster.waitActive(); @@ -65,6 +80,25 @@ public class TestComputeInvalidateWork { nodes = bm.getDatanodeManager().getHeartbeatManager().getDatanodes(); BlockManagerTestUtil.stopRedundancyThread(bm); assertEquals(nodes.length, NUM_OF_DATANODES); + + // Create a striped file + Path ecDir = new Path("/ec"); + fs = cluster.getFileSystem(); + fs.mkdirs(ecDir); + fs.getClient().setErasureCodingPolicy(ecDir.toString(), ecPolicy.getName()); + ecFile = new Path(ecDir, "ec-file"); + stripesPerBlock = 2; + cellSize = ecPolicy.getCellSize(); + int blockSize = stripesPerBlock * cellSize; + blockGroupSize = ecPolicy.getNumDataUnits() * blockSize; + totalBlockGroups = 4; + DFSTestUtil.createStripedFile(cluster, ecFile, ecDir, totalBlockGroups, + stripesPerBlock, false, ecPolicy); + LocatedBlocks lbs = cluster.getFileSystem().getClient(). + getNamenode().getBlockLocations( + ecFile.toString(), 0, blockGroupSize); + assert lbs.get(0) instanceof LocatedStripedBlock; + locatedStripedBlock = (LocatedStripedBlock)(lbs.get(0)); } @After @@ -75,12 +109,28 @@ public class TestComputeInvalidateWork { } } + private void verifyInvalidationWorkCounts(int blockInvalidateLimit) { + assertEquals(blockInvalidateLimit * NUM_OF_DATANODES, + bm.computeInvalidateWork(NUM_OF_DATANODES + 1)); + assertEquals(blockInvalidateLimit * NUM_OF_DATANODES, + bm.computeInvalidateWork(NUM_OF_DATANODES)); + assertEquals(blockInvalidateLimit * (NUM_OF_DATANODES - 1), + bm.computeInvalidateWork(NUM_OF_DATANODES - 1)); + int workCount = bm.computeInvalidateWork(1); + if (workCount == 1) { + assertEquals(blockInvalidateLimit + 1, bm.computeInvalidateWork(2)); + } else { + assertEquals(workCount, blockInvalidateLimit); + assertEquals(2, bm.computeInvalidateWork(2)); + } + } + /** * Test if {@link BlockManager#computeInvalidateWork(int)} - * can schedule invalidate work correctly + * can schedule invalidate work correctly for the replicas. */ @Test(timeout=120000) - public void testCompInvalidate() throws Exception { + public void testComputeInvalidateReplicas() throws Exception { final int blockInvalidateLimit = bm.getDatanodeManager() .getBlockInvalidateLimit(); namesystem.writeLock(); @@ -92,20 +142,66 @@ public class TestComputeInvalidateWork { bm.addToInvalidates(block, nodes[i]); } } - - assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, - bm.computeInvalidateWork(NUM_OF_DATANODES+1)); - assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, - bm.computeInvalidateWork(NUM_OF_DATANODES)); - assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1), - bm.computeInvalidateWork(NUM_OF_DATANODES-1)); - int workCount = bm.computeInvalidateWork(1); - if (workCount == 1) { - assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2)); - } else { - assertEquals(workCount, blockInvalidateLimit); - assertEquals(2, bm.computeInvalidateWork(2)); + verifyInvalidationWorkCounts(blockInvalidateLimit); + } finally { + namesystem.writeUnlock(); + } + } + + /** + * Test if {@link BlockManager#computeInvalidateWork(int)} + * can schedule invalidate work correctly for the striped block groups. + */ + @Test(timeout=120000) + public void testComputeInvalidateStripedBlockGroups() throws Exception { + final int blockInvalidateLimit = + bm.getDatanodeManager().getBlockInvalidateLimit(); + namesystem.writeLock(); + try { + int nodeCount = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(); + for (int i = 0; i < nodeCount; i++) { + for(int j = 0; j < 3 * blockInvalidateLimit + 1; j++) { + Block blk = new Block(locatedStripedBlock.getBlock().getBlockId() + + (i * 10 + j), stripesPerBlock * cellSize, + locatedStripedBlock.getBlock().getGenerationStamp()); + bm.addToInvalidates(blk, nodes[i]); + } + } + verifyInvalidationWorkCounts(blockInvalidateLimit); + } finally { + namesystem.writeUnlock(); + } + } + + /** + * Test if {@link BlockManager#computeInvalidateWork(int)} + * can schedule invalidate work correctly for both replicas and striped + * block groups, combined. + */ + @Test(timeout=120000) + public void testComputeInvalidate() throws Exception { + final int blockInvalidateLimit = + bm.getDatanodeManager().getBlockInvalidateLimit(); + final Random random = new Random(System.currentTimeMillis()); + namesystem.writeLock(); + try { + int nodeCount = ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits(); + for (int i = 0; i < nodeCount; i++) { + for(int j = 0; j < 3 * blockInvalidateLimit + 1; j++) { + if (random.nextBoolean()) { + Block stripedBlock = new Block( + locatedStripedBlock.getBlock().getBlockId() + (i * 10 + j), + stripesPerBlock * cellSize, + locatedStripedBlock.getBlock().getGenerationStamp()); + bm.addToInvalidates(stripedBlock, nodes[i]); + } else { + Block replica = new Block(i * (blockInvalidateLimit + 1) + j, 0, + GenerationStamp.LAST_RESERVED_STAMP); + bm.addToInvalidates(replica, nodes[i]); + } + } } + verifyInvalidationWorkCounts(blockInvalidateLimit); } finally { namesystem.writeUnlock(); } @@ -129,6 +225,11 @@ public class TestComputeInvalidateWork { Block block = new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP); bm.addToInvalidates(block, nodes[0]); + Block stripedBlock = new Block( + locatedStripedBlock.getBlock().getBlockId() + 100, + stripesPerBlock * cellSize, + locatedStripedBlock.getBlock().getGenerationStamp()); + bm.addToInvalidates(stripedBlock, nodes[0]); bm.getDatanodeManager().registerDatanode(dnr); // Since UUID has changed, the invalidation work should be skipped @@ -145,26 +246,37 @@ public class TestComputeInvalidateWork { final DistributedFileSystem dfs = cluster.getFileSystem(); final Path path = new Path("/testRR"); // Create a file and shutdown the DNs, which populates InvalidateBlocks + short totalReplicas = NUM_OF_DATANODES; DFSTestUtil.createFile(dfs, path, dfs.getDefaultBlockSize(), - (short) NUM_OF_DATANODES, 0xED0ED0); + totalReplicas, 0xED0ED0); DFSTestUtil.waitForReplication(dfs, path, (short) NUM_OF_DATANODES, 12000); for (DataNode dn : cluster.getDataNodes()) { dn.shutdown(); } dfs.delete(path, false); + dfs.delete(ecFile, false); namesystem.writeLock(); InvalidateBlocks invalidateBlocks; - int expected = NUM_OF_DATANODES; + int totalStripedDataBlocks = totalBlockGroups * (ecPolicy.getNumDataUnits() + + ecPolicy.getNumParityUnits()); + int expected = totalReplicas + totalStripedDataBlocks; try { invalidateBlocks = (InvalidateBlocks) Whitebox .getInternalState(cluster.getNamesystem().getBlockManager(), "invalidateBlocks"); - assertEquals("Expected invalidate blocks to be the number of DNs", + assertEquals("Invalidate blocks should include both Replicas and " + + "Striped BlockGroups!", (long) expected, invalidateBlocks.numBlocks()); + assertEquals("Unexpected invalidate count for replicas!", + totalReplicas, invalidateBlocks.getBlocksStat()); + assertEquals("Unexpected invalidate count for striped block groups!", + totalStripedDataBlocks, invalidateBlocks.getECBlockGroupsStat()); } finally { namesystem.writeUnlock(); } // Re-register each DN and see that it wipes the invalidation work + int totalBlockGroupsPerDataNode = totalBlockGroups; + int totalReplicasPerDataNode = totalReplicas / NUM_OF_DATANODES; for (DataNode dn : cluster.getDataNodes()) { DatanodeID did = dn.getDatanodeId(); DatanodeRegistration reg = new DatanodeRegistration( @@ -175,7 +287,7 @@ public class TestComputeInvalidateWork { namesystem.writeLock(); try { bm.getDatanodeManager().registerDatanode(reg); - expected--; + expected -= (totalReplicasPerDataNode + totalBlockGroupsPerDataNode); assertEquals("Expected number of invalidate blocks to decrease", (long) expected, invalidateBlocks.numBlocks()); } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java index 4bdaaac..3f8a5cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java @@ -25,14 +25,13 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockType; import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; import org.junit.Test; @@ -45,88 +44,130 @@ import org.junit.Test; */ public class TestCorruptReplicaInfo { - private static final Log LOG = - LogFactory.getLog(TestCorruptReplicaInfo.class); - - private final Map<Long, Block> block_map = - new HashMap<Long, Block>(); - - // Allow easy block creation by block id - // Return existing block if one with same block id already exists - private Block getBlock(Long block_id) { - if (!block_map.containsKey(block_id)) { - block_map.put(block_id, new Block(block_id,0,0)); + private static final Log LOG = LogFactory.getLog( + TestCorruptReplicaInfo.class); + private final Map<Long, Block> replicaMap = new HashMap<>(); + private final Map<Long, Block> stripedBlocksMap = new HashMap<>(); + + // Allow easy block creation by block id. Return existing + // replica block if one with same block id already exists. + private Block getReplica(Long blockId) { + if (!replicaMap.containsKey(blockId)) { + replicaMap.put(blockId, new Block(blockId, 0, 0)); } - - return block_map.get(block_id); + return replicaMap.get(blockId); } - - private Block getBlock(int block_id) { - return getBlock((long)block_id); + + private Block getReplica(int blkId) { + return getReplica(Long.valueOf(blkId)); + } + + private Block getStripedBlock(int blkId) { + Long stripedBlockId = (1L << 63) + blkId; + assertTrue(BlockIdManager.isStripedBlockID(stripedBlockId)); + if (!stripedBlocksMap.containsKey(stripedBlockId)) { + stripedBlocksMap.put(stripedBlockId, new Block(stripedBlockId, 1024, 0)); + } + return stripedBlocksMap.get(stripedBlockId); + } + + private void verifyCorruptBlocksCount(CorruptReplicasMap corruptReplicasMap, + long expectedReplicaCount, long expectedStripedBlockCount) { + long totalExpectedCorruptBlocks = expectedReplicaCount + + expectedStripedBlockCount; + assertEquals("Unexpected total corrupt blocks count!", + totalExpectedCorruptBlocks, corruptReplicasMap.size()); + assertEquals("Unexpected replica blocks count!", + expectedReplicaCount, corruptReplicasMap.getCorruptBlocksStat()); + assertEquals("Unexpected striped blocks count!", + expectedStripedBlockCount, + corruptReplicasMap.getCorruptECBlockGroupsStat()); } @Test - public void testCorruptReplicaInfo() throws IOException, - InterruptedException { - - CorruptReplicasMap crm = new CorruptReplicasMap(); - - // Make sure initial values are returned correctly - assertEquals("Number of corrupt blocks must initially be 0", 0, crm.size()); - assertNull("Param n cannot be less than 0", crm.getCorruptReplicaBlockIdsForTesting(-1, null)); - assertNull("Param n cannot be greater than 100", crm.getCorruptReplicaBlockIdsForTesting(101, null)); - long[] l = crm.getCorruptReplicaBlockIdsForTesting(0, null); - assertNotNull("n = 0 must return non-null", l); - assertEquals("n = 0 must return an empty list", 0, l.length); - - // create a list of block_ids. A list is used to allow easy validation of the - // output of getCorruptReplicaBlockIds - int NUM_BLOCK_IDS = 140; - List<Long> block_ids = new LinkedList<Long>(); - for (int i=0;i<NUM_BLOCK_IDS;i++) { - block_ids.add((long)i); - } - - DatanodeDescriptor dn1 = DFSTestUtil.getLocalDatanodeDescriptor(); - DatanodeDescriptor dn2 = DFSTestUtil.getLocalDatanodeDescriptor(); - - addToCorruptReplicasMap(crm, getBlock(0), dn1); - assertEquals("Number of corrupt blocks not returning correctly", - 1, crm.size()); - addToCorruptReplicasMap(crm, getBlock(1), dn1); - assertEquals("Number of corrupt blocks not returning correctly", - 2, crm.size()); - - addToCorruptReplicasMap(crm, getBlock(1), dn2); - assertEquals("Number of corrupt blocks not returning correctly", - 2, crm.size()); - - crm.removeFromCorruptReplicasMap(getBlock(1)); - assertEquals("Number of corrupt blocks not returning correctly", - 1, crm.size()); - - crm.removeFromCorruptReplicasMap(getBlock(0)); - assertEquals("Number of corrupt blocks not returning correctly", - 0, crm.size()); - - for (Long block_id: block_ids) { - addToCorruptReplicasMap(crm, getBlock(block_id), dn1); - } - - assertEquals("Number of corrupt blocks not returning correctly", - NUM_BLOCK_IDS, crm.size()); - - assertTrue("First five block ids not returned correctly ", - Arrays.equals(new long[]{0,1,2,3,4}, - crm.getCorruptReplicaBlockIdsForTesting(5, null))); - - LOG.info(crm.getCorruptReplicaBlockIdsForTesting(10, 7L)); - LOG.info(block_ids.subList(7, 18)); - - assertTrue("10 blocks after 7 not returned correctly ", - Arrays.equals(new long[]{8,9,10,11,12,13,14,15,16,17}, - crm.getCorruptReplicaBlockIdsForTesting(10, 7L))); - + public void testCorruptReplicaInfo() + throws IOException, InterruptedException { + CorruptReplicasMap crm = new CorruptReplicasMap(); + + // Make sure initial values are returned correctly + assertEquals("Total number of corrupt blocks must initially be 0!", + 0, crm.size()); + assertEquals("Number of corrupt replicas must initially be 0!", + 0, crm.getCorruptBlocksStat()); + assertEquals("Number of corrupt striped block groups must initially be 0!", + 0, crm.getCorruptECBlockGroupsStat()); + assertNull("Param n cannot be less than 0", + crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, -1, null)); + assertNull("Param n cannot be greater than 100", + crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 101, null)); + long[] l = crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 0, null); + assertNotNull("n = 0 must return non-null", l); + assertEquals("n = 0 must return an empty list", 0, l.length); + + // Create a list of block ids. A list is used to allow easy + // validation of the output of getCorruptReplicaBlockIds. + final int blockCount = 140; + long[] replicaIds = new long[blockCount]; + long[] stripedIds = new long[blockCount]; + for (int i = 0; i < blockCount; i++) { + replicaIds[i] = getReplica(i).getBlockId(); + stripedIds[i] = getStripedBlock(i).getBlockId(); + } + + DatanodeDescriptor dn1 = DFSTestUtil.getLocalDatanodeDescriptor(); + DatanodeDescriptor dn2 = DFSTestUtil.getLocalDatanodeDescriptor(); + + // Add to corrupt blocks map. + // Replicas + addToCorruptReplicasMap(crm, getReplica(0), dn1); + verifyCorruptBlocksCount(crm, 1, 0); + addToCorruptReplicasMap(crm, getReplica(1), dn1); + verifyCorruptBlocksCount(crm, 2, 0); + addToCorruptReplicasMap(crm, getReplica(1), dn2); + verifyCorruptBlocksCount(crm, 2, 0); + + // Striped blocks + addToCorruptReplicasMap(crm, getStripedBlock(0), dn1); + verifyCorruptBlocksCount(crm, 2, 1); + addToCorruptReplicasMap(crm, getStripedBlock(1), dn1); + verifyCorruptBlocksCount(crm, 2, 2); + addToCorruptReplicasMap(crm, getStripedBlock(1), dn2); + verifyCorruptBlocksCount(crm, 2, 2); + + // Remove from corrupt blocks map. + // Replicas + crm.removeFromCorruptReplicasMap(getReplica(1)); + verifyCorruptBlocksCount(crm, 1, 2); + crm.removeFromCorruptReplicasMap(getReplica(0)); + verifyCorruptBlocksCount(crm, 0, 2); + + // Striped blocks + crm.removeFromCorruptReplicasMap(getStripedBlock(1)); + verifyCorruptBlocksCount(crm, 0, 1); + crm.removeFromCorruptReplicasMap(getStripedBlock(0)); + verifyCorruptBlocksCount(crm, 0, 0); + + for (int blockId = 0; blockId < blockCount; blockId++) { + addToCorruptReplicasMap(crm, getReplica(blockId), dn1); + addToCorruptReplicasMap(crm, getStripedBlock(blockId), dn1); + } + + assertEquals("Number of corrupt blocks not returning correctly", + 2 * blockCount, crm.size()); + assertTrue("First five corrupt replica blocks ids are not right!", + Arrays.equals(Arrays.copyOfRange(replicaIds, 0, 5), + crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 5, null))); + assertTrue("First five corrupt striped blocks ids are not right!", + Arrays.equals(Arrays.copyOfRange(stripedIds, 0, 5), + crm.getCorruptBlockIdsForTesting(BlockType.STRIPED, 5, null))); + + assertTrue("10 replica blocks after 7 not returned correctly!", + Arrays.equals(Arrays.copyOfRange(replicaIds, 7, 17), + crm.getCorruptBlockIdsForTesting(BlockType.CONTIGUOUS, 10, 7L))); + assertTrue("10 striped blocks after 7 not returned correctly!", + Arrays.equals(Arrays.copyOfRange(stripedIds, 7, 17), + crm.getCorruptBlockIdsForTesting(BlockType.STRIPED, + 10, getStripedBlock(7).getBlockId()))); } private static void addToCorruptReplicasMap(CorruptReplicasMap crm, http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java index d853762..c65fc64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java @@ -45,9 +45,32 @@ public class TestLowRedundancyBlockQueues { return sblk; } + private void verifyBlockStats(LowRedundancyBlocks queues, + int lowRedundancyReplicaCount, int corruptReplicaCount, + int corruptReplicationOneCount, int lowRedundancyStripedCount, + int corruptStripedCount) { + assertEquals("Low redundancy replica count incorrect!", + lowRedundancyReplicaCount, queues.getLowRedundancyBlocksStat()); + assertEquals("Corrupt replica count incorrect!", + corruptReplicaCount, queues.getCorruptBlocksStat()); + assertEquals("Corrupt replica one count incorrect!", + corruptReplicationOneCount, + queues.getCorruptReplicationOneBlocksStat()); + assertEquals("Low redundancy striped blocks count incorrect!", + lowRedundancyStripedCount, queues.getLowRedundancyECBlockGroupsStat()); + assertEquals("Corrupt striped blocks count incorrect!", + corruptStripedCount, queues.getCorruptECBlockGroupsStat()); + assertEquals("Low Redundancy count incorrect!", + lowRedundancyReplicaCount + lowRedundancyStripedCount, + queues.getLowRedundancyBlockCount()); + assertEquals("LowRedundancyBlocks queue size incorrect!", + (lowRedundancyReplicaCount + corruptReplicaCount + + lowRedundancyStripedCount + corruptStripedCount), queues.size()); + } + /** * Test that adding blocks with different replication counts puts them - * into different queues + * into different queues. * @throws Throwable if something goes wrong */ @Test @@ -59,43 +82,45 @@ public class TestLowRedundancyBlockQueues { BlockInfo block_corrupt = genBlockInfo(4); BlockInfo block_corrupt_repl_one = genBlockInfo(5); - //add a block with a single entry + // Add a block with a single entry assertAdded(queues, block1, 1, 0, 3); - - assertEquals(1, queues.getLowRedundancyBlockCount()); - assertEquals(1, queues.size()); assertInLevel(queues, block1, LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY); - //repeated additions fail + verifyBlockStats(queues, 1, 0, 0, 0, 0); + + // Repeated additions fail assertFalse(queues.add(block1, 1, 0, 0, 3)); + verifyBlockStats(queues, 1, 0, 0, 0, 0); - //add a second block with two replicas + // Add a second block with two replicas assertAdded(queues, block2, 2, 0, 3); - assertEquals(2, queues.getLowRedundancyBlockCount()); - assertEquals(2, queues.size()); assertInLevel(queues, block2, LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY); - //now try to add a block that is corrupt + verifyBlockStats(queues, 2, 0, 0, 0, 0); + + // Now try to add a block that is corrupt assertAdded(queues, block_corrupt, 0, 0, 3); - assertEquals(3, queues.size()); - assertEquals(2, queues.getLowRedundancyBlockCount()); - assertEquals(1, queues.getCorruptBlockSize()); assertInLevel(queues, block_corrupt, LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS); + verifyBlockStats(queues, 2, 1, 0, 0, 0); - //insert a very insufficiently redundancy block + // Insert a very insufficiently redundancy block assertAdded(queues, block_very_low_redundancy, 4, 0, 25); assertInLevel(queues, block_very_low_redundancy, LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY); + verifyBlockStats(queues, 3, 1, 0, 0, 0); - //insert a corrupt block with replication factor 1 + // Insert a corrupt block with replication factor 1 assertAdded(queues, block_corrupt_repl_one, 0, 0, 1); - assertEquals(2, queues.getCorruptBlockSize()); - assertEquals(1, queues.getCorruptReplOneBlockSize()); + verifyBlockStats(queues, 3, 2, 1, 0, 0); + + // Bump up the expected count for corrupt replica one block from 1 to 3 queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2); - assertEquals(0, queues.getCorruptReplOneBlockSize()); + verifyBlockStats(queues, 3, 2, 0, 0, 0); + + // Reduce the expected replicas to 1 queues.update(block_corrupt, 0, 0, 0, 1, 0, -2); - assertEquals(1, queues.getCorruptReplOneBlockSize()); + verifyBlockStats(queues, 3, 2, 1, 0, 0); queues.update(block_very_low_redundancy, 0, 0, 0, 1, -4, -24); - assertEquals(2, queues.getCorruptReplOneBlockSize()); + verifyBlockStats(queues, 2, 3, 2, 0, 0); } @Test @@ -131,16 +156,18 @@ public class TestLowRedundancyBlockQueues { assertInLevel(queues, block, LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY); } + verifyBlockStats(queues, 0, 0, 0, numUR, 0); } // add a corrupted block BlockInfo block_corrupt = genStripedBlockInfo(-10, numBytes); assertEquals(numCorrupt, queues.getCorruptBlockSize()); + verifyBlockStats(queues, 0, 0, 0, numUR, numCorrupt); + assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize); numCorrupt++; - assertEquals(numUR + numCorrupt, queues.size()); - assertEquals(numUR, queues.getLowRedundancyBlockCount()); - assertEquals(numCorrupt, queues.getCorruptBlockSize()); + verifyBlockStats(queues, 0, 0, 0, numUR, numCorrupt); + assertInLevel(queues, block_corrupt, LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java index c0b54b0..e21d44e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java @@ -37,38 +37,51 @@ import java.util.Iterator; public class TestUnderReplicatedBlocks { - @Test(timeout=60000) // 1 min timeout - public void testSetrepIncWithUnderReplicatedBlocks() throws Exception { + @Test(timeout=120000) // 1 min timeout + public void testSetRepIncWithUnderReplicatedBlocks() throws Exception { Configuration conf = new HdfsConfiguration(); final short REPLICATION_FACTOR = 2; final String FILE_NAME = "/testFile"; final Path FILE_PATH = new Path(FILE_NAME); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION_FACTOR + 1).build(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf). + numDataNodes(REPLICATION_FACTOR + 1).build(); try { // create a file with one block with a replication factor of 2 final FileSystem fs = cluster.getFileSystem(); + final BlockManager bm = cluster.getNamesystem().getBlockManager(); DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L); DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR); - + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.verifyClientStats(conf, cluster); + // remove one replica from the blocksMap so block becomes under-replicated // but the block does not get put into the under-replicated blocks queue - final BlockManager bm = cluster.getNamesystem().getBlockManager(); ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH); DatanodeDescriptor dn = bm.blocksMap.getStorages(b.getLocalBlock()) .iterator().next().getDatanodeDescriptor(); bm.addToInvalidates(b.getLocalBlock(), dn); + + // Compute the invalidate work in NN, and trigger the heartbeat from DN BlockManagerTestUtil.computeAllPendingWork(bm); DataNodeTestUtils.triggerHeartbeat(cluster.getDataNode(dn.getIpcPort())); // Wait to make sure the DataNode receives the deletion request Thread.sleep(5000); + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.verifyClientStats(conf, cluster); + // Remove the record from blocksMap bm.blocksMap.removeNode(b.getLocalBlock(), dn); - + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.verifyClientStats(conf, cluster); + // increment this file's replication factor FsShell shell = new FsShell(conf); - assertEquals(0, shell.run(new String[]{ - "-setrep", "-w", Integer.toString(1+REPLICATION_FACTOR), FILE_NAME})); + assertEquals(0, shell.run(new String[] { + "-setrep", "-w", Integer.toString(1 + REPLICATION_FACTOR), + FILE_NAME })); + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.verifyClientStats(conf, cluster); } finally { cluster.shutdown(); } @@ -126,25 +139,30 @@ public class TestUnderReplicatedBlocks { final BlockManager bm = cluster.getNamesystem().getBlockManager(); ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, FILE_PATH); Iterator<DatanodeStorageInfo> storageInfos = - bm.blocksMap.getStorages(b.getLocalBlock()) - .iterator(); + bm.blocksMap.getStorages(b.getLocalBlock()).iterator(); DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor(); DatanodeDescriptor secondDn = storageInfos.next().getDatanodeDescriptor(); - bm.getDatanodeManager().removeDatanode(firstDn); + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.verifyClientStats(conf, cluster); + bm.getDatanodeManager().removeDatanode(firstDn); + BlockManagerTestUtil.updateState(bm); assertEquals(NUM_OF_BLOCKS, bm.getUnderReplicatedNotMissingBlocks()); - bm.computeDatanodeWork(); + DFSTestUtil.verifyClientStats(conf, cluster); + bm.computeDatanodeWork(); assertTrue("The number of replication work pending before targets are " + "determined should be non-negative.", (Integer)Whitebox.getInternalState(secondDn, "pendingReplicationWithoutTargets") >= 0); + BlockManagerTestUtil.updateState(bm); assertTrue("The number of blocks to be replicated should be less than " + "or equal to " + bm.replicationStreamsHardLimit, secondDn.getNumberOfBlocksToBeReplicated() <= bm.replicationStreamsHardLimit); + DFSTestUtil.verifyClientStats(conf, cluster); } finally { cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java index 90eb7d1..22cba6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java @@ -199,7 +199,7 @@ public class TestReadOnlySharedStorage { assertThat(numberReplicas.replicasOnStaleNodes(), is(0)); BlockManagerTestUtil.updateState(blockManager); - assertThat(blockManager.getUnderReplicatedBlocksCount(), is(0L)); + assertThat(blockManager.getLowRedundancyBlocksCount(), is(0L)); assertThat(blockManager.getExcessBlocksCount(), is(0L)); } @@ -238,7 +238,7 @@ public class TestReadOnlySharedStorage { // The block should be reported as under-replicated BlockManagerTestUtil.updateState(blockManager); - assertThat(blockManager.getUnderReplicatedBlocksCount(), is(1L)); + assertThat(blockManager.getLowRedundancyBlocksCount(), is(1L)); // The BlockManager should be able to heal the replication count back to 1 // by triggering an inter-datanode replication from one of the READ_ONLY_SHARED replicas http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java index d268d01..71a9f6f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java @@ -118,7 +118,7 @@ public class TestLazyPersistFiles extends LazyPersistTestCase { assertThat(cluster.getNameNode() .getNamesystem() .getBlockManager() - .getUnderReplicatedBlocksCount(), + .getLowRedundancyBlocksCount(), is(0L)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java index 555c2fa..c556699 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java @@ -424,7 +424,9 @@ public class TestAddStripedBlocks { cluster.getDataNodes().get(3).getDatanodeId(), reports[0]); BlockManagerTestUtil.updateState(ns.getBlockManager()); // the total number of corrupted block info is still 1 + Assert.assertEquals(1, ns.getCorruptECBlockGroupsStat()); Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + Assert.assertEquals(0, ns.getCorruptBlocksStat()); // 2 internal blocks corrupted Assert.assertEquals(2, bm.getCorruptReplicas(stored).size()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java index 3cf025c..11d7431 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java @@ -410,7 +410,7 @@ public class TestDecommissioningStatus { // All nodes are dead and decommed. Blocks should be missing. long missingBlocks = bm.getMissingBlocksCount(); - long underreplicated = bm.getUnderReplicatedBlocksCount(); + long underreplicated = bm.getLowRedundancyBlocksCount(); assertTrue(missingBlocks > 0); assertTrue(underreplicated > 0); @@ -440,7 +440,7 @@ public class TestDecommissioningStatus { // Blocks should be still be under-replicated Thread.sleep(2000); // Let replication monitor run - assertEquals(underreplicated, bm.getUnderReplicatedBlocksCount()); + assertEquals(underreplicated, bm.getLowRedundancyBlocksCount()); // Start up a node. LOG.info("Starting two more nodes"); @@ -448,13 +448,13 @@ public class TestDecommissioningStatus { cluster.waitActive(); // Replication should fix it. int count = 0; - while((bm.getUnderReplicatedBlocksCount() > 0 || + while((bm.getLowRedundancyBlocksCount() > 0 || bm.getPendingReconstructionBlocksCount() > 0) && count++ < 10) { Thread.sleep(1000); } - assertEquals(0, bm.getUnderReplicatedBlocksCount()); + assertEquals(0, bm.getLowRedundancyBlocksCount()); assertEquals(0, bm.getPendingReconstructionBlocksCount()); assertEquals(0, bm.getMissingBlocksCount()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java index ed9ed3a..32c2a49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.CombinedHostFileManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@ -774,17 +775,24 @@ public class TestNameNodeMXBean { } MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - ObjectName mxbeanName = new ObjectName( + ObjectName replStateMBeanName = new ObjectName( + "Hadoop:service=NameNode,name=ReplicatedBlocksState"); + ObjectName ecBlkGrpStateMBeanName = new ObjectName( + "Hadoop:service=NameNode,name=ECBlockGroupsState"); + ObjectName namenodeMXBeanName = new ObjectName( "Hadoop:service=NameNode,name=NameNodeInfo"); // Wait for the metrics to discover the unrecoverable block group + long expectedMissingBlockCount = 1L; + long expectedCorruptBlockCount = 1L; GenericTestUtils.waitFor(new Supplier<Boolean>() { @Override public Boolean get() { try { Long numMissingBlocks = - (Long) mbs.getAttribute(mxbeanName, "NumberOfMissingBlocks"); - if (numMissingBlocks == 1L) { + (Long) mbs.getAttribute(namenodeMXBeanName, + "NumberOfMissingBlocks"); + if (numMissingBlocks == expectedMissingBlockCount) { return true; } } catch (Exception e) { @@ -794,7 +802,43 @@ public class TestNameNodeMXBean { } }, 1000, 60000); - String corruptFiles = (String) (mbs.getAttribute(mxbeanName, + BlockManagerTestUtil.updateState( + cluster.getNamesystem().getBlockManager()); + + // Verification of missing blocks + long totalMissingBlocks = cluster.getNamesystem().getMissingBlocksCount(); + Long replicaMissingBlocks = + (Long) mbs.getAttribute(replStateMBeanName, + "MissingBlocksStat"); + Long ecMissingBlocks = + (Long) mbs.getAttribute(ecBlkGrpStateMBeanName, + "MissingECBlockGroupsStat"); + assertEquals("Unexpected total missing blocks!", + expectedMissingBlockCount, totalMissingBlocks); + assertEquals("Unexpected total missing blocks!", + totalMissingBlocks, + (replicaMissingBlocks + ecMissingBlocks)); + assertEquals("Unexpected total ec missing blocks!", + expectedMissingBlockCount, ecMissingBlocks.longValue()); + + // Verification of corrupt blocks + long totalCorruptBlocks = + cluster.getNamesystem().getCorruptReplicaBlocks(); + Long replicaCorruptBlocks = + (Long) mbs.getAttribute(replStateMBeanName, + "CorruptBlocksStat"); + Long ecCorruptBlocks = + (Long) mbs.getAttribute(ecBlkGrpStateMBeanName, + "CorruptECBlockGroupsStat"); + assertEquals("Unexpected total corrupt blocks!", + expectedCorruptBlockCount, totalCorruptBlocks); + assertEquals("Unexpected total corrupt blocks!", + totalCorruptBlocks, + (replicaCorruptBlocks + ecCorruptBlocks)); + assertEquals("Unexpected total ec corrupt blocks!", + expectedCorruptBlockCount, ecCorruptBlocks.longValue()); + + String corruptFiles = (String) (mbs.getAttribute(namenodeMXBeanName, "CorruptFiles")); int numCorruptFiles = ((Object[]) JSON.parse(corruptFiles)).length; assertEquals(1, numCorruptFiles); http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java index 34fec5b..540ae63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; @@ -50,6 +51,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.BitSet; +import java.util.Iterator; import java.util.List; import static org.junit.Assert.assertEquals; @@ -157,6 +159,8 @@ public class TestReconstructStripedBlocks { assertEquals(numBlocks, missedNode.numBlocks()); bm.getDatanodeManager().removeDatanode(missedNode); } + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.verifyClientStats(conf, cluster); BlockManagerTestUtil.getComputedDatanodeWork(bm); @@ -185,6 +189,8 @@ public class TestReconstructStripedBlocks { info.getSourceDnInfos().length); } } + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.verifyClientStats(conf, cluster); } finally { cluster.shutdown(); } @@ -212,6 +218,8 @@ public class TestReconstructStripedBlocks { final byte[] data = new byte[fileLen]; DFSTestUtil.writeFile(fs, p, data); DFSTestUtil.waitForReplication(fs, p, groupSize, 5000); + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.verifyClientStats(conf, cluster); LocatedStripedBlock lb = (LocatedStripedBlock)fs.getClient() .getLocatedBlocks(p.toString(), 0).get(0); @@ -219,16 +227,20 @@ public class TestReconstructStripedBlocks { cellSize, dataBlocks, parityBlocks); BlockManagerTestUtil.getComputedDatanodeWork(bm); + BlockManagerTestUtil.updateState(bm); assertEquals(0, getNumberOfBlocksToBeErasureCoded(cluster)); assertEquals(0, bm.getPendingReconstructionBlocksCount()); + DFSTestUtil.verifyClientStats(conf, cluster); // missing 1 block, so 1 task should be scheduled DatanodeInfo dn0 = lbs[0].getLocations()[0]; cluster.stopDataNode(dn0.getName()); cluster.setDataNodeDead(dn0); BlockManagerTestUtil.getComputedDatanodeWork(bm); + BlockManagerTestUtil.updateState(bm); assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster)); assertEquals(1, bm.getPendingReconstructionBlocksCount()); + DFSTestUtil.verifyClientStats(conf, cluster); // missing another block, but no new task should be scheduled because // previous task isn't finished. @@ -236,8 +248,10 @@ public class TestReconstructStripedBlocks { cluster.stopDataNode(dn1.getName()); cluster.setDataNodeDead(dn1); BlockManagerTestUtil.getComputedDatanodeWork(bm); + BlockManagerTestUtil.updateState(bm); assertEquals(1, getNumberOfBlocksToBeErasureCoded(cluster)); assertEquals(1, bm.getPendingReconstructionBlocksCount()); + DFSTestUtil.verifyClientStats(conf, cluster); } finally { cluster.shutdown(); } @@ -294,6 +308,7 @@ public class TestReconstructStripedBlocks { // bring the dn back: 10 internal blocks now cluster.restartDataNode(dnProp); cluster.waitActive(); + DFSTestUtil.verifyClientStats(conf, cluster); // stop another dn: 9 internal blocks, but only cover 8 real one dnToStop = block.getLocations()[1]; @@ -352,4 +367,72 @@ public class TestReconstructStripedBlocks { cluster.shutdown(); } } + + @Test(timeout=120000) // 1 min timeout + public void testReconstructionWork() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1000); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, + 1000); + conf.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, + 5); + + ErasureCodingPolicy policy = SystemErasureCodingPolicies.getByID( + SystemErasureCodingPolicies.XOR_2_1_POLICY_ID); + conf.setStrings(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, + policy.getName()); + Path ecDir = new Path("/ec"); + Path ecFilePath = new Path(ecDir, "ec-file"); + int blockGroups = 2; + int totalDataNodes = policy.getNumDataUnits() + + policy.getNumParityUnits() + 1; + + MiniDFSCluster dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes( + totalDataNodes).build(); + try { + // create an EC file with 2 block groups + final DistributedFileSystem fs = dfsCluster.getFileSystem(); + fs.mkdirs(ecDir); + fs.setErasureCodingPolicy(ecDir, policy.getName()); + DFSTestUtil.createStripedFile(dfsCluster, ecFilePath, ecDir, + blockGroups, 2, false, policy); + + final BlockManager bm = dfsCluster.getNamesystem().getBlockManager(); + LocatedBlocks lbs = fs.getClient().getNamenode().getBlockLocations( + ecFilePath.toString(), 0, blockGroups); + assert lbs.get(0) instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + + Iterator<DatanodeStorageInfo> storageInfos = + bm.getStorages(bg.getBlock().getLocalBlock()).iterator(); + DatanodeDescriptor firstDn = storageInfos.next().getDatanodeDescriptor(); + + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.verifyClientStats(conf, dfsCluster); + + // Remove one of the DataUnit nodes + bm.getDatanodeManager().removeDatanode(firstDn); + + // Verify low redundancy count matching EC block groups count + BlockManagerTestUtil.updateState(bm); + assertEquals(blockGroups, bm.getLowRedundancyECBlockGroupsStat()); + DFSTestUtil.verifyClientStats(conf, dfsCluster); + + + // Trigger block group reconstruction + BlockManagerTestUtil.getComputedDatanodeWork(bm); + BlockManagerTestUtil.updateState(bm); + + // Verify pending reconstruction count + assertEquals(blockGroups, getNumberOfBlocksToBeErasureCoded(dfsCluster)); + assertEquals(0, bm.getLowRedundancyECBlockGroupsStat()); + DFSTestUtil.verifyClientStats(conf, dfsCluster); + } finally { + dfsCluster.shutdown(); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/999c8fcb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java index 4ad742e..c84f8e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystemTestWrapper; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag; import org.apache.hadoop.hdfs.client.HdfsAdmin; @@ -57,8 +58,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; @@ -90,17 +95,23 @@ public class TestNameNodeMetrics { new Path("/testNameNodeMetrics"); private static final String NN_METRICS = "NameNodeActivity"; private static final String NS_METRICS = "FSNamesystem"; + private static final int BLOCK_SIZE = 1024 * 1024; + private static final ErasureCodingPolicy EC_POLICY = + SystemErasureCodingPolicies.getByID( + SystemErasureCodingPolicies.XOR_2_1_POLICY_ID); + public static final Log LOG = LogFactory.getLog(TestNameNodeMetrics.class); // Number of datanodes in the cluster - private static final int DATANODE_COUNT = 3; + private static final int DATANODE_COUNT = EC_POLICY.getNumDataUnits() + + EC_POLICY.getNumParityUnits() + 1; private static final int WAIT_GAUGE_VALUE_RETRIES = 20; // Rollover interval of percentile metrics (in seconds) private static final int PERCENTILES_INTERVAL = 1; static { - CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100); + CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); CONF.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1); CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFS_REDUNDANCY_INTERVAL); @@ -109,7 +120,11 @@ public class TestNameNodeMetrics { CONF.set(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, "" + PERCENTILES_INTERVAL); // Enable stale DataNodes checking - CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); + CONF.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); + // Enable erasure coding + CONF.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, + EC_POLICY.getName()); GenericTestUtils.setLogLevel(LogFactory.getLog(MetricsAsserts.class), Level.DEBUG); } @@ -119,18 +134,23 @@ public class TestNameNodeMetrics { private final Random rand = new Random(); private FSNamesystem namesystem; private BlockManager bm; + private Path ecDir; private static Path getTestPath(String fileName) { return new Path(TEST_ROOT_DIR_PATH, fileName); } - + @Before public void setUp() throws Exception { - cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build(); + cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT) + .build(); cluster.waitActive(); namesystem = cluster.getNamesystem(); bm = namesystem.getBlockManager(); fs = cluster.getFileSystem(); + ecDir = getTestPath("/ec"); + fs.mkdirs(ecDir); + fs.setErasureCodingPolicy(ecDir, EC_POLICY.getName()); } @After @@ -219,49 +239,125 @@ public class TestNameNodeMetrics { /** Test metrics associated with addition of a file */ @Test public void testFileAdd() throws Exception { - // Add files with 100 blocks - final Path file = getTestPath("testFileAdd"); - createFile(file, 3200, (short)3); + // File creations final long blockCount = 32; + final Path normalFile = getTestPath("testFileAdd"); + createFile(normalFile, blockCount * BLOCK_SIZE, (short)3); + final Path ecFile = new Path(ecDir, "ecFile.log"); + DFSTestUtil.createStripedFile(cluster, ecFile, null, (int) blockCount, 1, + false, EC_POLICY); + int blockCapacity = namesystem.getBlockCapacity(); assertGauge("BlockCapacity", blockCapacity, getMetrics(NS_METRICS)); MetricsRecordBuilder rb = getMetrics(NN_METRICS); - // File create operations is 1 - // Number of files created is depth of <code>file</code> path - assertCounter("CreateFileOps", 1L, rb); - assertCounter("FilesCreated", (long)file.depth(), rb); - - long filesTotal = file.depth() + 1; // Add 1 for root + // File create operations are 2 + assertCounter("CreateFileOps", 2L, rb); + // Number of files created is depth of normalFile and ecFile, after + // removing the duplicate accounting for root test dir. + assertCounter("FilesCreated", + (long)(normalFile.depth() + ecFile.depth()), rb); + + long filesTotal = normalFile.depth() + ecFile.depth() + 1 /* ecDir */; rb = getMetrics(NS_METRICS); assertGauge("FilesTotal", filesTotal, rb); - assertGauge("BlocksTotal", blockCount, rb); - fs.delete(file, true); + assertGauge("BlocksTotal", blockCount * 2, rb); + fs.delete(normalFile, true); filesTotal--; // reduce the filecount for deleted file rb = waitForDnMetricValue(NS_METRICS, "FilesTotal", filesTotal); + assertGauge("BlocksTotal", blockCount, rb); + assertGauge("PendingDeletionBlocks", 0L, rb); + + fs.delete(ecFile, true); + filesTotal--; + rb = waitForDnMetricValue(NS_METRICS, "FilesTotal", filesTotal); assertGauge("BlocksTotal", 0L, rb); assertGauge("PendingDeletionBlocks", 0L, rb); rb = getMetrics(NN_METRICS); // Delete file operations and number of files deleted must be 1 - assertCounter("DeleteFileOps", 1L, rb); - assertCounter("FilesDeleted", 1L, rb); + assertCounter("DeleteFileOps", 2L, rb); + assertCounter("FilesDeleted", 2L, rb); } - + + /** + * Verify low redundancy and corrupt blocks metrics are zero. + * @throws Exception + */ + private void verifyZeroMetrics() throws Exception { + BlockManagerTestUtil.updateState(bm); + MetricsRecordBuilder rb = waitForDnMetricValue(NS_METRICS, + "CorruptBlocks", 0L, 500); + + // Verify aggregated blocks metrics + assertGauge("UnderReplicatedBlocks", 0L, rb); // Deprecated metric + assertGauge("LowRedundancyBlocks", 0L, rb); + assertGauge("PendingReplicationBlocks", 0L, rb); // Deprecated metric + assertGauge("PendingReconstructionBlocks", 0L, rb); + + // Verify replica metrics + assertGauge("LowRedundancyReplicatedBlocks", 0L, rb); + assertGauge("CorruptReplicatedBlocks", 0L, rb); + + // Verify striped block groups metrics + assertGauge("LowRedundancyECBlockGroups", 0L, rb); + assertGauge("CorruptECBlockGroups", 0L, rb); + } + + /** + * Verify aggregated metrics equals the sum of replicated blocks metrics + * and erasure coded blocks metrics. + * @throws Exception + */ + private void verifyAggregatedMetricsTally() throws Exception { + BlockManagerTestUtil.updateState(bm); + assertEquals("Under replicated metrics not matching!", + namesystem.getLowRedundancyBlocks(), + namesystem.getUnderReplicatedBlocks()); + assertEquals("Low redundancy metrics not matching!", + namesystem.getLowRedundancyBlocks(), + namesystem.getLowRedundancyBlocksStat() + + namesystem.getLowRedundancyECBlockGroupsStat()); + assertEquals("Corrupt blocks metrics not matching!", + namesystem.getCorruptReplicaBlocks(), + namesystem.getCorruptBlocksStat() + + namesystem.getCorruptECBlockGroupsStat()); + assertEquals("Missing blocks metrics not matching!", + namesystem.getMissingBlocksCount(), + namesystem.getMissingBlocksStat() + + namesystem.getMissingECBlockGroupsStat()); + assertEquals("Missing blocks with replication factor one not matching!", + namesystem.getMissingReplOneBlocksCount(), + namesystem.getMissingReplicationOneBlocksStat()); + assertEquals("Bytes in future blocks metrics not matching!", + namesystem.getBytesInFuture(), + namesystem.getBlocksBytesInFutureStat() + + namesystem.getECBlocksBytesInFutureStat()); + assertEquals("Pending deletion blocks metrics not matching!", + namesystem.getPendingDeletionBlocks(), + namesystem.getPendingDeletionBlocksStat() + + namesystem.getPendingDeletionECBlockGroupsStat()); + } + /** Corrupt a block and ensure metrics reflects it */ @Test public void testCorruptBlock() throws Exception { // Create a file with single block with two replicas final Path file = getTestPath("testCorruptBlock"); - createFile(file, 100, (short)2); - + final short replicaCount = 2; + createFile(file, 100, replicaCount); + DFSTestUtil.waitForReplication(fs, file, replicaCount, 15000); + // Disable the heartbeats, so that no corrupted replica // can be fixed for (DataNode dn : cluster.getDataNodes()) { DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); } - + + verifyZeroMetrics(); + verifyAggregatedMetricsTally(); + // Corrupt first replica of the block LocatedBlock block = NameNodeAdapter.getBlockLocations( cluster.getNameNode(), file.toString(), 0, 1).get(0); @@ -272,12 +368,50 @@ public class TestNameNodeMetrics { } finally { cluster.getNamesystem().writeUnlock(); } + + BlockManagerTestUtil.updateState(bm); + MetricsRecordBuilder rb = waitForDnMetricValue(NS_METRICS, + "CorruptBlocks", 1L, 500); + // Verify aggregated blocks metrics + assertGauge("LowRedundancyBlocks", 1L, rb); + assertGauge("PendingReplicationBlocks", 0L, rb); + assertGauge("PendingReconstructionBlocks", 0L, rb); + // Verify replicated blocks metrics + assertGauge("LowRedundancyReplicatedBlocks", 1L, rb); + assertGauge("CorruptReplicatedBlocks", 1L, rb); + // Verify striped blocks metrics + assertGauge("LowRedundancyECBlockGroups", 0L, rb); + assertGauge("CorruptECBlockGroups", 0L, rb); + + verifyAggregatedMetricsTally(); + + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); + } + + // Start block reconstruction work BlockManagerTestUtil.getComputedDatanodeWork(bm); - MetricsRecordBuilder rb = getMetrics(NS_METRICS); - assertGauge("CorruptBlocks", 1L, rb); - assertGauge("PendingReplicationBlocks", 1L, rb); - + + BlockManagerTestUtil.updateState(bm); + DFSTestUtil.waitForReplication(fs, file, replicaCount, 30000); + rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500); + + // Verify aggregated blocks metrics + assertGauge("LowRedundancyBlocks", 0L, rb); + assertGauge("CorruptBlocks", 0L, rb); + assertGauge("PendingReplicationBlocks", 0L, rb); + assertGauge("PendingReconstructionBlocks", 0L, rb); + // Verify replicated blocks metrics + assertGauge("LowRedundancyReplicatedBlocks", 0L, rb); + assertGauge("CorruptReplicatedBlocks", 0L, rb); + // Verify striped blocks metrics + assertGauge("LowRedundancyECBlockGroups", 0L, rb); + assertGauge("CorruptECBlockGroups", 0L, rb); + + verifyAggregatedMetricsTally(); + fs.delete(file, true); + BlockManagerTestUtil.getComputedDatanodeWork(bm); // During the file deletion, both BlockManager#corruptReplicas and // BlockManager#pendingReplications will be updated, i.e., the records // for the blocks of the deleted file will be removed from both @@ -287,11 +421,97 @@ public class TestNameNodeMetrics { // BlockManager#updateState is called. And in // BlockManager#computeDatanodeWork the metric ScheduledReplicationBlocks // will also be updated. - rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L); + BlockManagerTestUtil.updateState(bm); + waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500); + verifyZeroMetrics(); + verifyAggregatedMetricsTally(); + } + + @Test (timeout = 90000L) + public void testStripedFileCorruptBlocks() throws Exception { + final long fileLen = BLOCK_SIZE * 4; + final Path ecFile = new Path(ecDir, "ecFile.log"); + DFSTestUtil.createFile(fs, ecFile, fileLen, (short) 1, 0L); + StripedFileTestUtil.waitBlockGroupsReported(fs, ecFile.toString()); + + // Disable the heartbeats, so that no corrupted replica + // can be fixed + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + } + + verifyZeroMetrics(); + verifyAggregatedMetricsTally(); + + // Corrupt first replica of the block + LocatedBlocks lbs = fs.getClient().getNamenode().getBlockLocations( + ecFile.toString(), 0, fileLen); + assert lbs.get(0) instanceof LocatedStripedBlock; + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + + cluster.getNamesystem().writeLock(); + try { + bm.findAndMarkBlockAsCorrupt(bg.getBlock(), bg.getLocations()[0], + "STORAGE_ID", "TEST"); + } finally { + cluster.getNamesystem().writeUnlock(); + } + + BlockManagerTestUtil.updateState(bm); + MetricsRecordBuilder rb = waitForDnMetricValue(NS_METRICS, + "CorruptBlocks", 1L, 500); + // Verify aggregated blocks metrics + assertGauge("LowRedundancyBlocks", 1L, rb); assertGauge("PendingReplicationBlocks", 0L, rb); - assertGauge("ScheduledReplicationBlocks", 0L, rb); + assertGauge("PendingReconstructionBlocks", 0L, rb); + // Verify replica metrics + assertGauge("LowRedundancyReplicatedBlocks", 0L, rb); + assertGauge("CorruptReplicatedBlocks", 0L, rb); + // Verify striped block groups metrics + assertGauge("LowRedundancyECBlockGroups", 1L, rb); + assertGauge("CorruptECBlockGroups", 1L, rb); + + verifyAggregatedMetricsTally(); + + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false); + } + + // Start block reconstruction work + BlockManagerTestUtil.getComputedDatanodeWork(bm); + BlockManagerTestUtil.updateState(bm); + StripedFileTestUtil.waitForReconstructionFinished(ecFile, fs, 3); + + rb = waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500); + assertGauge("CorruptBlocks", 0L, rb); + assertGauge("PendingReplicationBlocks", 0L, rb); + assertGauge("PendingReconstructionBlocks", 0L, rb); + // Verify replicated blocks metrics + assertGauge("LowRedundancyReplicatedBlocks", 0L, rb); + assertGauge("CorruptReplicatedBlocks", 0L, rb); + // Verify striped blocks metrics + assertGauge("LowRedundancyECBlockGroups", 0L, rb); + assertGauge("CorruptECBlockGroups", 0L, rb); + + verifyAggregatedMetricsTally(); + + fs.delete(ecFile, true); + BlockManagerTestUtil.getComputedDatanodeWork(bm); + // During the file deletion, both BlockManager#corruptReplicas and + // BlockManager#pendingReplications will be updated, i.e., the records + // for the blocks of the deleted file will be removed from both + // corruptReplicas and pendingReplications. The corresponding + // metrics (CorruptBlocks and PendingReplicationBlocks) will only be updated + // when BlockManager#computeDatanodeWork is run where the + // BlockManager#updateState is called. And in + // BlockManager#computeDatanodeWork the metric ScheduledReplicationBlocks + // will also be updated. + BlockManagerTestUtil.updateState(bm); + waitForDnMetricValue(NS_METRICS, "CorruptBlocks", 0L, 500); + verifyZeroMetrics(); + verifyAggregatedMetricsTally(); } - + /** Create excess blocks by reducing the replication factor for * for a file and ensure metrics reflects it */ @@ -340,7 +560,7 @@ public class TestNameNodeMetrics { private void waitForDeletion() throws InterruptedException { // Wait for more than DATANODE_COUNT replication intervals to ensure all // the blocks pending deletion are sent for deletion to the datanodes. - Thread.sleep(DFS_REDUNDANCY_INTERVAL * (DATANODE_COUNT + 1) * 1000); + Thread.sleep(DFS_REDUNDANCY_INTERVAL * DATANODE_COUNT * 1000); } /** @@ -358,20 +578,25 @@ public class TestNameNodeMetrics { * @throws Exception if something went wrong. */ private MetricsRecordBuilder waitForDnMetricValue(String source, - String name, - long expected) - throws Exception { + String name, long expected) throws Exception { + // initial wait + waitForDeletion(); + return waitForDnMetricValue(source, name, expected, + DFS_REDUNDANCY_INTERVAL * 500); + } + + private MetricsRecordBuilder waitForDnMetricValue(String source, + String name, long expected, long sleepInterval) throws Exception { MetricsRecordBuilder rb; long gauge; - //initial wait. - waitForDeletion(); - //lots of retries are allowed for slow systems; fast ones will still - //exit early - int retries = (DATANODE_COUNT + 1) * WAIT_GAUGE_VALUE_RETRIES; + // Lots of retries are allowed for slow systems. + // Fast ones will still exit early. + int retries = DATANODE_COUNT * WAIT_GAUGE_VALUE_RETRIES; rb = getMetrics(source); gauge = MetricsAsserts.getLongGauge(name, rb); while (gauge != expected && (--retries > 0)) { - Thread.sleep(DFS_REDUNDANCY_INTERVAL * 500); + Thread.sleep(sleepInterval); + BlockManagerTestUtil.updateState(bm); rb = getMetrics(source); gauge = MetricsAsserts.getLongGauge(name, rb); } @@ -516,22 +741,22 @@ public class TestNameNodeMetrics { getMetrics(NS_METRICS)); assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS)); - assertGauge("LastWrittenTransactionId", 1L, getMetrics(NS_METRICS)); - assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS)); - assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS)); + assertGauge("LastWrittenTransactionId", 3L, getMetrics(NS_METRICS)); + assertGauge("TransactionsSinceLastCheckpoint", 3L, getMetrics(NS_METRICS)); + assertGauge("TransactionsSinceLastLogRoll", 3L, getMetrics(NS_METRICS)); fs.mkdirs(new Path(TEST_ROOT_DIR_PATH, "/tmp")); assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS)); - assertGauge("LastWrittenTransactionId", 2L, getMetrics(NS_METRICS)); - assertGauge("TransactionsSinceLastCheckpoint", 2L, getMetrics(NS_METRICS)); - assertGauge("TransactionsSinceLastLogRoll", 2L, getMetrics(NS_METRICS)); + assertGauge("LastWrittenTransactionId", 4L, getMetrics(NS_METRICS)); + assertGauge("TransactionsSinceLastCheckpoint", 4L, getMetrics(NS_METRICS)); + assertGauge("TransactionsSinceLastLogRoll", 4L, getMetrics(NS_METRICS)); cluster.getNameNodeRpc().rollEditLog(); assertGauge("LastCheckpointTime", lastCkptTime, getMetrics(NS_METRICS)); - assertGauge("LastWrittenTransactionId", 4L, getMetrics(NS_METRICS)); - assertGauge("TransactionsSinceLastCheckpoint", 4L, getMetrics(NS_METRICS)); + assertGauge("LastWrittenTransactionId", 6L, getMetrics(NS_METRICS)); + assertGauge("TransactionsSinceLastCheckpoint", 6L, getMetrics(NS_METRICS)); assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS)); cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER, false); @@ -541,7 +766,7 @@ public class TestNameNodeMetrics { long newLastCkptTime = MetricsAsserts.getLongGauge("LastCheckpointTime", getMetrics(NS_METRICS)); assertTrue(lastCkptTime < newLastCkptTime); - assertGauge("LastWrittenTransactionId", 6L, getMetrics(NS_METRICS)); + assertGauge("LastWrittenTransactionId", 8L, getMetrics(NS_METRICS)); assertGauge("TransactionsSinceLastCheckpoint", 1L, getMetrics(NS_METRICS)); assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS)); } @@ -554,10 +779,10 @@ public class TestNameNodeMetrics { public void testSyncAndBlockReportMetric() throws Exception { MetricsRecordBuilder rb = getMetrics(NN_METRICS); // We have one sync when the cluster starts up, just opening the journal - assertCounter("SyncsNumOps", 1L, rb); + assertCounter("SyncsNumOps", 3L, rb); // Each datanode reports in when the cluster comes up assertCounter("BlockReportNumOps", - (long)DATANODE_COUNT * cluster.getStoragesPerDatanode(), rb); + (long) DATANODE_COUNT * cluster.getStoragesPerDatanode(), rb); // Sleep for an interval+slop to let the percentiles rollover Thread.sleep((PERCENTILES_INTERVAL+1)*1000); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
