HDFS-10530. BlockManager reconstruction work scheduling should correctly adhere to EC block placement policy. Contributed by Manoj Govindassamy and Rui Gao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4812518b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4812518b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4812518b Branch: refs/heads/HADOOP-13345 Commit: 4812518b23cac496ab5cdad5258773bcd9728770 Parents: 09ad8ef Author: Andrew Wang <[email protected]> Authored: Thu Mar 16 15:07:38 2017 -0700 Committer: Andrew Wang <[email protected]> Committed: Thu Mar 16 15:07:38 2017 -0700 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 2 +- .../hdfs/server/balancer/TestBalancer.java | 11 +- .../blockmanagement/TestBlockManager.java | 113 ++++++++++++++++++- 3 files changed, 119 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4812518b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 5dc40fa..be30e78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -4179,7 +4179,7 @@ public class BlockManager implements BlockStatsMXBean { BlockPlacementPolicy placementPolicy = placementPolicies .getPolicy(blockType); int numReplicas = blockType == STRIPED ? ((BlockInfoStriped) storedBlock) - .getRealDataBlockNum() : storedBlock.getReplication(); + .getRealTotalBlockNum() : storedBlock.getReplication(); return placementPolicy.verifyBlockPlacement(locs, numReplicas) .isPlacementPolicySatisfied(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4812518b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index d2e8f44..30a3a32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -1918,7 +1918,7 @@ public class TestBalancer { } private void doTestBalancerWithStripedFile(Configuration conf) throws Exception { - int numOfDatanodes = dataBlocks + parityBlocks + 2; + int numOfDatanodes = dataBlocks + parityBlocks + 3; int numOfRacks = dataBlocks; long capacity = 20 * defaultBlockSize; long[] capacities = new long[numOfDatanodes]; @@ -1956,11 +1956,12 @@ public class TestBalancer { LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen); StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); - // add one datanode + // add datanodes in new rack String newRack = "/rack" + (++numOfRacks); - cluster.startDataNodes(conf, 1, true, null, - new String[]{newRack}, null, new long[]{capacity}); - totalCapacity += capacity; + cluster.startDataNodes(conf, 2, true, null, + new String[]{newRack, newRack}, null, + new long[]{capacity, capacity}); + totalCapacity += capacity*2; cluster.triggerHeartbeats(); // run balancer and validate results http://git-wip-us.apache.org/repos/asf/hadoop/blob/4812518b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 00bea1c..36dafa5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; @@ -55,6 +56,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; @@ -68,6 +71,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -125,13 +129,14 @@ public class TestBlockManager { * of times trying to trigger the incorrect behavior. */ private static final int NUM_TEST_ITERS = 30; - private static final int BLOCK_SIZE = 64*1024; + private static final Log LOG = LogFactory.getLog(TestBlockManager.class); private FSNamesystem fsn; private BlockManager bm; private long mockINodeId; + @Before public void setupMockCluster() throws IOException { Configuration conf = new HdfsConfiguration(); @@ -1287,4 +1292,110 @@ public class TestBlockManager { isReplicaCorrupt(Mockito.any(BlockInfo.class), Mockito.any(DatanodeDescriptor.class)); } + + @Test (timeout = 300000) + public void testPlacementPolicySatisfied() throws Exception { + LOG.info("Starting testPlacementPolicySatisfied."); + final String[] initialRacks = new String[]{ + "/rack0", "/rack1", "/rack2", "/rack3", "/rack4", "/rack5"}; + final String[] initialHosts = new String[]{ + "host0", "host1", "host2", "host3", "host4", "host5"}; + final int numDataBlocks = StripedFileTestUtil.getDefaultECPolicy() + .getNumDataUnits(); + final int numParityBlocks = StripedFileTestUtil.getDefaultECPolicy() + .getNumParityUnits(); + final long blockSize = 64 * 1024; + Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1); + conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY, + StripedFileTestUtil.getDefaultECPolicy().getName()); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf) + .racks(initialRacks) + .hosts(initialHosts) + .numDataNodes(initialRacks.length) + .build(); + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final Path ecDir = new Path("/ec"); + final Path testFileUnsatisfied = new Path(ecDir, "test1"); + final Path testFileSatisfied = new Path(ecDir, "test2"); + cluster.getFileSystem().getClient().mkdirs(ecDir.toString(), null, true); + cluster.getFileSystem().getClient() + .setErasureCodingPolicy(ecDir.toString(), + StripedFileTestUtil.getDefaultECPolicy().getName()); + long fileLen = blockSize * numDataBlocks; + + // Create a file to be stored in 6 racks. + DFSTestUtil.createFile(dfs, testFileUnsatisfied, fileLen, (short) 1, 1); + // Block placement policy should be satisfied as rack count + // is less than numDataBlocks + numParityBlocks. + verifyPlacementPolicy(cluster, testFileUnsatisfied, true); + + LOG.info("Adding 3 new hosts in the existing racks."); + cluster.startDataNodes(conf, 3, true, null, + new String[]{"/rack3", "/rack4", "/rack5"}, + new String[]{"host3-2", "host4-2", "host5-2"}, null); + cluster.triggerHeartbeats(); + + LOG.info("Waiting for EC reconstruction to complete."); + DFSTestUtil.waitForReplication(dfs, testFileUnsatisfied, + (short)(numDataBlocks + numParityBlocks), 30 * 1000); + // Block placement policy should still be satisfied + // as there are only 6 racks. + verifyPlacementPolicy(cluster, testFileUnsatisfied, true); + + LOG.info("Adding 3 new hosts in 3 new racks."); + cluster.startDataNodes(conf, 3, true, null, + new String[]{"/rack6", "/rack7", "/rack8"}, + new String[]{"host6", "host7", "host8"}, + null); + cluster.triggerHeartbeats(); + // Addition of new racks can make the existing EC files block + // placements unsatisfied and there is NO automatic block + // reconstruction for this yet. + // TODO: + // Verify for block placement satisfied once the automatic + // block reconstruction is implemented. + verifyPlacementPolicy(cluster, testFileUnsatisfied, false); + + // Create a new file + DFSTestUtil.createFile(dfs, testFileSatisfied, fileLen, (short) 1, 1); + // The new file should be rightly placed on all 9 racks + // and the block placement policy should be satisfied. + verifyPlacementPolicy(cluster, testFileUnsatisfied, false); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private void verifyPlacementPolicy(final MiniDFSCluster cluster, + final Path file, boolean isBlockPlacementSatisfied) throws IOException { + DistributedFileSystem dfs = cluster.getFileSystem(); + BlockManager blockManager = cluster.getNamesystem().getBlockManager(); + LocatedBlock lb = DFSTestUtil.getAllBlocks(dfs, file).get(0); + BlockInfo blockInfo = + blockManager.getStoredBlock(lb.getBlock().getLocalBlock()); + Iterator<DatanodeStorageInfo> itr = blockInfo.getStorageInfos(); + LOG.info("Block " + blockInfo + " storages: "); + while (itr.hasNext()) { + DatanodeStorageInfo dn = itr.next(); + LOG.info(" Rack: " + dn.getDatanodeDescriptor().getNetworkLocation() + + ", DataNode: " + dn.getDatanodeDescriptor().getXferAddr()); + } + if (isBlockPlacementSatisfied) { + assertTrue("Block group of " + file + "should be placement" + + " policy satisfied, currently!", + blockManager.isPlacementPolicySatisfied(blockInfo)); + } else { + assertFalse("Block group of " + file + " should be placement" + + " policy unsatisfied, currently!", + blockManager.isPlacementPolicySatisfied(blockInfo)); + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
