Repository: hadoop Updated Branches: refs/heads/trunk 3929ac934 -> a68b6eb0f
HDFS-9015. Refactor TestReplicationPolicy to test different block placement policies. (Ming Ma via lei) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a68b6eb0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a68b6eb0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a68b6eb0 Branch: refs/heads/trunk Commit: a68b6eb0f4110ba626a44fad6b9eb5d8c5a4901f Parents: 3929ac9 Author: Lei Xu <[email protected]> Authored: Fri Oct 2 11:41:23 2015 -0700 Committer: Lei Xu <[email protected]> Committed: Fri Oct 2 11:42:09 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../BaseReplicationPolicyTest.java | 160 ++++++++++++++++++ .../blockmanagement/TestReplicationPolicy.java | 148 ++++------------- .../TestReplicationPolicyConsiderLoad.java | 121 ++++---------- .../TestReplicationPolicyWithNodeGroup.java | 161 ++++--------------- 5 files changed, 257 insertions(+), 336 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a68b6eb0/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 49ea985..a280add 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1010,6 +1010,9 @@ Release 2.8.0 - UNRELEASED 'CredentialBasedAccessTokenProvider.getCredential()' abstract methods to public (Santhosh Nayak via cnauroth) + HDFS-9015. Refactor TestReplicationPolicy to test different block placement + policies. (Ming Ma via lei) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/a68b6eb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java new file mode 100644 index 0000000..c541da3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BaseReplicationPolicyTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.TestBlockStoragePolicy; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.PathUtils; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Before; + +abstract public class BaseReplicationPolicyTest { + { + GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL); + } + + protected NetworkTopology cluster; + protected DatanodeDescriptor dataNodes[]; + protected static final int BLOCK_SIZE = 1024; + protected NameNode namenode; + protected DatanodeManager dnManager; + protected BlockPlacementPolicy replicator; + protected final String filename = "/dummyfile.txt"; + protected DatanodeStorageInfo[] storages; + protected String blockPlacementPolicy; + protected NamenodeProtocols nameNodeRpc = null; + + static void updateHeartbeatWithUsage(DatanodeDescriptor dn, + long capacity, long dfsUsed, long remaining, long blockPoolUsed, + long dnCacheCapacity, long dnCacheUsed, int xceiverCount, + int volFailures) { + dn.getStorageInfos()[0].setUtilizationForTesting( + capacity, dfsUsed, remaining, blockPoolUsed); + dn.updateHeartbeat( + BlockManagerTestUtil.getStorageReportsForDatanode(dn), + dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null); + } + + abstract DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf); + + @Before + public void setupCluster() throws Exception { + Configuration conf = new HdfsConfiguration(); + dataNodes = getDatanodeDescriptors(conf); + + FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class); + conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, + new File(baseDir, "name").getPath()); + conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + blockPlacementPolicy); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); + conf.setBoolean( + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true); + DFSTestUtil.formatNameNode(conf); + namenode = new NameNode(conf); + nameNodeRpc = namenode.getRpcServer(); + + final BlockManager bm = namenode.getNamesystem().getBlockManager(); + replicator = bm.getBlockPlacementPolicy(); + cluster = bm.getDatanodeManager().getNetworkTopology(); + dnManager = bm.getDatanodeManager(); + // construct network topology + for (int i=0; i < dataNodes.length; i++) { + cluster.add(dataNodes[i]); + //bm.getDatanodeManager().getHost2DatanodeMap().add(dataNodes[i]); + bm.getDatanodeManager().getHeartbeatManager().addDatanode( + dataNodes[i]); + } + updateHeartbeatWithUsage(); + } + + void updateHeartbeatWithUsage() { + for (int i=0; i < dataNodes.length; i++) { + updateHeartbeatWithUsage(dataNodes[i], + 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, + 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); + } + } + + @After + public void tearDown() throws Exception { + namenode.stop(); + } + + boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) { + return isOnSameRack(left, right.getDatanodeDescriptor()); + } + + boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) { + return cluster.isOnSameRack(left.getDatanodeDescriptor(), right); + } + + DatanodeStorageInfo[] chooseTarget(int numOfReplicas) { + return chooseTarget(numOfReplicas, dataNodes[0]); + } + + DatanodeStorageInfo[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer) { + return chooseTarget(numOfReplicas, writer, + new ArrayList<DatanodeStorageInfo>()); + } + + DatanodeStorageInfo[] chooseTarget(int numOfReplicas, + List<DatanodeStorageInfo> chosenNodes) { + return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes); + } + + DatanodeStorageInfo[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) { + return chooseTarget(numOfReplicas, writer, chosenNodes, null); + } + + DatanodeStorageInfo[] chooseTarget(int numOfReplicas, + List<DatanodeStorageInfo> chosenNodes, Set<Node> excludedNodes) { + return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, + excludedNodes); + } + + DatanodeStorageInfo[] chooseTarget(int numOfReplicas, + DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes, + Set<Node> excludedNodes) { + return replicator.chooseTarget(filename, numOfReplicas, writer, + chosenNodes, false, excludedNodes, BLOCK_SIZE, + TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a68b6eb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index a653d45..b8a7e77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -26,7 +26,6 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -39,7 +38,6 @@ import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; @@ -55,52 +53,40 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; -import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.test.PathUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; -public class TestReplicationPolicy { - { - GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL); - } +@RunWith(Parameterized.class) +public class TestReplicationPolicy extends BaseReplicationPolicyTest { - private static final int BLOCK_SIZE = 1024; - private static final int NUM_OF_DATANODES = 6; - private static NetworkTopology cluster; - private static NameNode namenode; - private static BlockPlacementPolicy replicator; private static final String filename = "/dummyfile.txt"; - private static DatanodeDescriptor[] dataNodes; - private static DatanodeStorageInfo[] storages; // The interval for marking a datanode as stale, private static final long staleInterval = DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT; @Rule public ExpectedException exception = ExpectedException.none(); - - private static void updateHeartbeatWithUsage(DatanodeDescriptor dn, - long capacity, long dfsUsed, long remaining, long blockPoolUsed, - long dnCacheCapacity, long dnCacheUsed, int xceiverCount, int volFailures) { - dn.getStorageInfos()[0].setUtilizationForTesting( - capacity, dfsUsed, remaining, blockPoolUsed); - dn.updateHeartbeat( - BlockManagerTestUtil.getStorageReportsForDatanode(dn), - dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null); + + public TestReplicationPolicy(String blockPlacementPolicyClassName) { + this.blockPlacementPolicy = blockPlacementPolicyClassName; } - private static void updateHeartbeatForExtraStorage(long capacity, + @Parameterized.Parameters + public static Iterable<Object[]> data() { + return Arrays.asList(new Object[][] { + { BlockPlacementPolicyDefault.class.getName() } }); + } + + private void updateHeartbeatForExtraStorage(long capacity, long dfsUsed, long remaining, long blockPoolUsed) { DatanodeDescriptor dn = dataNodes[5]; dn.getStorageInfos()[1].setUtilizationForTesting( @@ -110,9 +96,19 @@ public class TestReplicationPolicy { 0L, 0L, 0, 0, null); } - @BeforeClass - public static void setupCluster() throws Exception { - Configuration conf = new HdfsConfiguration(); + private void resetHeartbeatForStorages() { + for (int i=0; i < dataNodes.length; i++) { + updateHeartbeatWithUsage(dataNodes[i], + 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, + 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, + 0, 0); + } + // No available space in the extra storage of dn0 + updateHeartbeatForExtraStorage(0L, 0L, 0L, 0L); + } + + @Override + DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) { final String[] racks = { "/d1/r1", "/d1/r1", @@ -121,59 +117,13 @@ public class TestReplicationPolicy { "/d2/r3", "/d2/r3"}; storages = DFSTestUtil.createDatanodeStorageInfos(racks); - dataNodes = DFSTestUtil.toDatanodeDescriptor(storages); - // create an extra storage for dn5. DatanodeStorage extraStorage = new DatanodeStorage( storages[5].getStorageID() + "-extra", DatanodeStorage.State.NORMAL, StorageType.DEFAULT); -/* DatanodeStorageInfo si = new DatanodeStorageInfo( - storages[5].getDatanodeDescriptor(), extraStorage); -*/ BlockManagerTestUtil.updateStorage(storages[5].getDatanodeDescriptor(), extraStorage); - - FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); - conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); - File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class); - conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, - new File(baseDir, "name").getPath()); - - conf.setBoolean( - DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); - conf.setBoolean( - DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true); - DFSTestUtil.formatNameNode(conf); - namenode = new NameNode(conf); - - final BlockManager bm = namenode.getNamesystem().getBlockManager(); - replicator = bm.getBlockPlacementPolicy(); - cluster = bm.getDatanodeManager().getNetworkTopology(); - // construct network topology - for (int i=0; i < NUM_OF_DATANODES; i++) { - cluster.add(dataNodes[i]); - bm.getDatanodeManager().getHeartbeatManager().addDatanode( - dataNodes[i]); - } - resetHeartbeatForStorages(); - } - - private static void resetHeartbeatForStorages() { - for (int i=0; i < NUM_OF_DATANODES; i++) { - updateHeartbeatWithUsage(dataNodes[i], - 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); - } - // No available space in the extra storage of dn0 - updateHeartbeatForExtraStorage(0L, 0L, 0L, 0L); - } - - private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) { - return isOnSameRack(left, right.getDatanodeDescriptor()); - } - - private static boolean isOnSameRack(DatanodeStorageInfo left, DatanodeDescriptor right) { - return cluster.isOnSameRack(left.getDatanodeDescriptor(), right); + return DFSTestUtil.toDatanodeDescriptor(storages); } /** @@ -269,40 +219,6 @@ public class TestReplicationPolicy { resetHeartbeatForStorages(); } - private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) { - return chooseTarget(numOfReplicas, dataNodes[0]); - } - - private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas, - DatanodeDescriptor writer) { - return chooseTarget(numOfReplicas, writer, - new ArrayList<DatanodeStorageInfo>()); - } - - private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas, - List<DatanodeStorageInfo> chosenNodes) { - return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes); - } - - private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas, - DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) { - return chooseTarget(numOfReplicas, writer, chosenNodes, null); - } - - private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas, - List<DatanodeStorageInfo> chosenNodes, Set<Node> excludedNodes) { - return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes); - } - - private static DatanodeStorageInfo[] chooseTarget( - int numOfReplicas, - DatanodeDescriptor writer, - List<DatanodeStorageInfo> chosenNodes, - Set<Node> excludedNodes) { - return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes, - false, excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); - } - /** * In this testcase, client is dataNodes[0], but the dataNodes[1] is * not allowed to be chosen. So the 1st replica should be @@ -555,7 +471,7 @@ public class TestReplicationPolicy { throws Exception { try { namenode.getNamesystem().getBlockManager().getDatanodeManager() - .setNumStaleNodes(NUM_OF_DATANODES); + .setNumStaleNodes(dataNodes.length); testChooseTargetWithMoreThanAvailableNodes(); } finally { namenode.getNamesystem().getBlockManager().getDatanodeManager() @@ -583,8 +499,8 @@ public class TestReplicationPolicy { // try to choose NUM_OF_DATANODES which is more than actually available // nodes. - DatanodeStorageInfo[] targets = chooseTarget(NUM_OF_DATANODES); - assertEquals(targets.length, NUM_OF_DATANODES - 2); + DatanodeStorageInfo[] targets = chooseTarget(dataNodes.length); + assertEquals(targets.length, dataNodes.length - 2); final List<LoggingEvent> log = appender.getLog(); assertNotNull(log); @@ -1256,7 +1172,7 @@ public class TestReplicationPolicy { // Adding this block will increase its current replication, and that will // remove it from the queue. bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info, - ReplicaState.FINALIZED), TestReplicationPolicy.storages[0]); + ReplicaState.FINALIZED), storages[0]); // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a68b6eb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java index 7ff2930..74db283 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java @@ -20,85 +20,45 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.TestBlockStoragePolicy; -import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; -import org.apache.hadoop.hdfs.server.common.StorageInfo; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.test.PathUtils; -import org.apache.hadoop.util.VersionInfo; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; -public class TestReplicationPolicyConsiderLoad { +@RunWith(Parameterized.class) +public class TestReplicationPolicyConsiderLoad + extends BaseReplicationPolicyTest { - private static NameNode namenode; - private static DatanodeManager dnManager; - private static List<DatanodeRegistration> dnrList; - private static DatanodeDescriptor[] dataNodes; - private static DatanodeStorageInfo[] storages; + public TestReplicationPolicyConsiderLoad(String blockPlacementPolicy) { + this.blockPlacementPolicy = blockPlacementPolicy; + } + + @Parameterized.Parameters + public static Iterable<Object[]> data() { + return Arrays.asList(new Object[][] { + { BlockPlacementPolicyDefault.class.getName() } }); + } - @BeforeClass - public static void setupCluster() throws IOException { - Configuration conf = new HdfsConfiguration(); + @Override + DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) { final String[] racks = { "/rack1", "/rack1", - "/rack1", "/rack2", "/rack2", - "/rack2"}; + "/rack3", + "/rack3"}; storages = DFSTestUtil.createDatanodeStorageInfos(racks); - dataNodes = DFSTestUtil.toDatanodeDescriptor(storages); - FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); - conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); - File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class); - conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, - new File(baseDir, "name").getPath()); - conf.setBoolean( - DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true); - conf.setBoolean( - DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true); - conf.setBoolean( - DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); - DFSTestUtil.formatNameNode(conf); - namenode = new NameNode(conf); - int blockSize = 1024; - - dnrList = new ArrayList<DatanodeRegistration>(); - dnManager = namenode.getNamesystem().getBlockManager().getDatanodeManager(); - - // Register DNs - for (int i=0; i < 6; i++) { - DatanodeRegistration dnr = new DatanodeRegistration(dataNodes[i], - new StorageInfo(NodeType.DATA_NODE), new ExportedBlockKeys(), - VersionInfo.getVersion()); - dnrList.add(dnr); - dnManager.registerDatanode(dnr); - dataNodes[i].getStorageInfos()[0].setUtilizationForTesting( - 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L, - 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*blockSize, 0L); - dataNodes[i].updateHeartbeat( - BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[i]), - 0L, 0L, 0, 0, null); - } + return DFSTestUtil.toDatanodeDescriptor(storages); } private final double EPSILON = 0.0001; @@ -110,46 +70,39 @@ public class TestReplicationPolicyConsiderLoad { public void testChooseTargetWithDecomNodes() throws IOException { namenode.getNamesystem().writeLock(); try { - String blockPoolId = namenode.getNamesystem().getBlockPoolId(); - dnManager.handleHeartbeat(dnrList.get(3), + dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[3], BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]), - blockPoolId, dataNodes[3].getCacheCapacity(), - dataNodes[3].getCacheRemaining(), - 2, 0, 0, null); - dnManager.handleHeartbeat(dnrList.get(4), + dataNodes[3].getCacheCapacity(), + dataNodes[3].getCacheUsed(), + 2, 0, null); + dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[4], BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[4]), - blockPoolId, dataNodes[4].getCacheCapacity(), - dataNodes[4].getCacheRemaining(), - 4, 0, 0, null); - dnManager.handleHeartbeat(dnrList.get(5), + dataNodes[4].getCacheCapacity(), + dataNodes[4].getCacheUsed(), + 4, 0, null); + dnManager.getHeartbeatManager().updateHeartbeat(dataNodes[5], BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[5]), - blockPoolId, dataNodes[5].getCacheCapacity(), - dataNodes[5].getCacheRemaining(), - 4, 0, 0, null); + dataNodes[5].getCacheCapacity(), + dataNodes[5].getCacheUsed(), + 4, 0, null); + // value in the above heartbeats final int load = 2 + 4 + 4; - FSNamesystem fsn = namenode.getNamesystem(); assertEquals((double)load/6, dnManager.getFSClusterStats() .getInServiceXceiverAverage(), EPSILON); // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget() // returns false for (int i = 0; i < 3; i++) { - DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i)); + DatanodeDescriptor d = dataNodes[i]; dnManager.getDecomManager().startDecommission(d); d.setDecommissioned(); } assertEquals((double)load/3, dnManager.getFSClusterStats() .getInServiceXceiverAverage(), EPSILON); - // update references of writer DN to update the de-commissioned state - List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>(); - dnManager.fetchDatanodes(liveNodes, null, false); - DatanodeDescriptor writerDn = null; - if (liveNodes.contains(dataNodes[0])) { - writerDn = liveNodes.get(liveNodes.indexOf(dataNodes[0])); - } + DatanodeDescriptor writerDn = dataNodes[0]; // Call chooseTarget() DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager() @@ -171,10 +124,4 @@ public class TestReplicationPolicyConsiderLoad { } NameNode.LOG.info("Done working on it"); } - - @AfterClass - public static void teardownCluster() { - if (namenode != null) namenode.stop(); - } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a68b6eb0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index e973925..85598ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -32,38 +31,25 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.TestBlockStoragePolicy; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopologyWithNodeGroup; import org.apache.hadoop.net.Node; -import org.apache.hadoop.test.PathUtils; -import org.junit.After; -import org.junit.Before; import org.junit.Test; -public class TestReplicationPolicyWithNodeGroup { - private static final int BLOCK_SIZE = 1024; - private static final int NUM_OF_DATANODES = 8; - private static final int NUM_OF_DATANODES_BOUNDARY = 6; - private static final int NUM_OF_DATANODES_MORE_TARGETS = 12; - private static final int NUM_OF_DATANODES_FOR_DEPENDENCIES = 6; - private final Configuration CONF = new HdfsConfiguration(); - private NetworkTopology cluster; - private NameNode namenode; - private BlockPlacementPolicy replicator; - private static final String filename = "/dummyfile.txt"; - - private static final DatanodeStorageInfo[] storages; - private static final DatanodeDescriptor[] dataNodes; - static { +public class TestReplicationPolicyWithNodeGroup extends BaseReplicationPolicyTest { + public TestReplicationPolicyWithNodeGroup() { + this.blockPlacementPolicy = BlockPlacementPolicyWithNodeGroup.class.getName(); + } + + @Override + DatanodeDescriptor[] getDatanodeDescriptors(Configuration conf) { + conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, + NetworkTopologyWithNodeGroup.class.getName()); final String[] racks = { "/d1/r1/n1", "/d1/r1/n1", @@ -75,7 +61,7 @@ public class TestReplicationPolicyWithNodeGroup { "/d2/r3/n6" }; storages = DFSTestUtil.createDatanodeStorageInfos(racks); - dataNodes = DFSTestUtil.toDatanodeDescriptor(storages); + return DFSTestUtil.toDatanodeDescriptor(storages); } private static final DatanodeStorageInfo[] storagesInBoundaryCase; @@ -142,60 +128,7 @@ public class TestReplicationPolicyWithNodeGroup { dataNodesForDependencies = DFSTestUtil.toDatanodeDescriptor(storagesForDependencies); }; - - @Before - public void setUp() throws Exception { - FileSystem.setDefaultUri(CONF, "hdfs://localhost:0"); - CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); - // Set properties to make HDFS aware of NodeGroup. - CONF.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, - BlockPlacementPolicyWithNodeGroup.class.getName()); - CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, - NetworkTopologyWithNodeGroup.class.getName()); - - CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY, true); - - File baseDir = PathUtils.getTestDir(TestReplicationPolicyWithNodeGroup.class); - - CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, - new File(baseDir, "name").getPath()); - - DFSTestUtil.formatNameNode(CONF); - namenode = new NameNode(CONF); - final BlockManager bm = namenode.getNamesystem().getBlockManager(); - replicator = bm.getBlockPlacementPolicy(); - cluster = bm.getDatanodeManager().getNetworkTopology(); - // construct network topology - for(int i=0; i<NUM_OF_DATANODES; i++) { - cluster.add(dataNodes[i]); - } - setupDataNodeCapacity(); - } - - @After - public void tearDown() throws Exception { - namenode.stop(); - } - - private static void updateHeartbeatWithUsage(DatanodeDescriptor dn, - long capacity, long dfsUsed, long remaining, long blockPoolUsed, - long dnCacheCapacity, long dnCacheUsed, int xceiverCount, - int volFailures) { - dn.getStorageInfos()[0].setUtilizationForTesting( - capacity, dfsUsed, remaining, blockPoolUsed); - dn.updateHeartbeat( - BlockManagerTestUtil.getStorageReportsForDatanode(dn), - dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null); - } - private static void setupDataNodeCapacity() { - for(int i=0; i<NUM_OF_DATANODES; i++) { - updateHeartbeatWithUsage(dataNodes[i], - 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); - } - } - /** * Scan the targets list: all targets should be on different NodeGroups. * Return false if two targets are found on the same NodeGroup. @@ -217,10 +150,6 @@ public class TestReplicationPolicyWithNodeGroup { return true; } - private boolean isOnSameRack(DatanodeStorageInfo left, DatanodeStorageInfo right) { - return isOnSameRack(left.getDatanodeDescriptor(), right); - } - private boolean isOnSameRack(DatanodeDescriptor left, DatanodeStorageInfo right) { return cluster.isOnSameRack(left, right.getDatanodeDescriptor()); } @@ -233,35 +162,6 @@ public class TestReplicationPolicyWithNodeGroup { return cluster.isOnSameNodeGroup(left, right.getDatanodeDescriptor()); } - private DatanodeStorageInfo[] chooseTarget(int numOfReplicas) { - return chooseTarget(numOfReplicas, dataNodes[0]); - } - - private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, - DatanodeDescriptor writer) { - return chooseTarget(numOfReplicas, writer, - new ArrayList<DatanodeStorageInfo>()); - } - - private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, - List<DatanodeStorageInfo> chosenNodes) { - return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes); - } - - private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, - DatanodeDescriptor writer, List<DatanodeStorageInfo> chosenNodes) { - return chooseTarget(numOfReplicas, writer, chosenNodes, null); - } - - private DatanodeStorageInfo[] chooseTarget( - int numOfReplicas, - DatanodeDescriptor writer, - List<DatanodeStorageInfo> chosenNodes, - Set<Node> excludedNodes) { - return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes, - false, excludedNodes, BLOCK_SIZE, TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY); - } - /** * In this testcase, client is dataNodes[0]. So the 1st replica should be * placed on dataNodes[0], the 2nd replica should be placed on @@ -467,7 +367,7 @@ public class TestReplicationPolicyWithNodeGroup { */ @Test public void testChooseTarget5() throws Exception { - setupDataNodeCapacity(); + updateHeartbeatWithUsage(); DatanodeStorageInfo[] targets; targets = chooseTarget(0, NODE); assertEquals(targets.length, 0); @@ -514,7 +414,7 @@ public class TestReplicationPolicyWithNodeGroup { */ @Test public void testRereplicate1() throws Exception { - setupDataNodeCapacity(); + updateHeartbeatWithUsage(); List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>(); chosenNodes.add(storages[0]); DatanodeStorageInfo[] targets; @@ -547,7 +447,7 @@ public class TestReplicationPolicyWithNodeGroup { */ @Test public void testRereplicate2() throws Exception { - setupDataNodeCapacity(); + updateHeartbeatWithUsage(); List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>(); chosenNodes.add(storages[0]); chosenNodes.add(storages[1]); @@ -575,7 +475,7 @@ public class TestReplicationPolicyWithNodeGroup { */ @Test public void testRereplicate3() throws Exception { - setupDataNodeCapacity(); + updateHeartbeatWithUsage(); List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>(); chosenNodes.add(storages[0]); chosenNodes.add(storages[3]); @@ -671,19 +571,14 @@ public class TestReplicationPolicyWithNodeGroup { */ @Test public void testChooseTargetsOnBoundaryTopology() throws Exception { - for(int i=0; i<NUM_OF_DATANODES; i++) { + for(int i=0; i<dataNodes.length; i++) { cluster.remove(dataNodes[i]); } - for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) { + for(int i=0; i<dataNodesInBoundaryCase.length; i++) { cluster.add(dataNodesInBoundaryCase[i]); } - for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) { - updateHeartbeatWithUsage(dataNodes[0], - 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, - (HdfsServerConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, - 0L, 0L, 0L, 0, 0); - + for(int i=0; i<dataNodesInBoundaryCase.length; i++) { updateHeartbeatWithUsage(dataNodesInBoundaryCase[i], 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); @@ -714,7 +609,7 @@ public class TestReplicationPolicyWithNodeGroup { */ @Test public void testRereplicateOnBoundaryTopology() throws Exception { - for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) { + for(int i=0; i<dataNodesInBoundaryCase.length; i++) { updateHeartbeatWithUsage(dataNodesInBoundaryCase[i], 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); @@ -738,21 +633,21 @@ public class TestReplicationPolicyWithNodeGroup { */ @Test public void testChooseMoreTargetsThanNodeGroups() throws Exception { - for(int i=0; i<NUM_OF_DATANODES; i++) { + for(int i=0; i<dataNodes.length; i++) { cluster.remove(dataNodes[i]); } - for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) { + for(int i=0; i<dataNodesInBoundaryCase.length; i++) { DatanodeDescriptor node = dataNodesInBoundaryCase[i]; if (cluster.contains(node)) { cluster.remove(node); } } - for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) { + for(int i=0; i<dataNodesInMoreTargetsCase.length; i++) { cluster.add(dataNodesInMoreTargetsCase[i]); } - for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) { + for(int i=0; i<dataNodesInMoreTargetsCase.length; i++) { updateHeartbeatWithUsage(dataNodesInMoreTargetsCase[i], 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); @@ -773,11 +668,11 @@ public class TestReplicationPolicyWithNodeGroup { @Test public void testChooseTargetWithDependencies() throws Exception { - for(int i=0; i<NUM_OF_DATANODES; i++) { + for(int i=0; i<dataNodes.length; i++) { cluster.remove(dataNodes[i]); } - for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) { + for(int i=0; i<dataNodesInMoreTargetsCase.length; i++) { DatanodeDescriptor node = dataNodesInMoreTargetsCase[i]; if (cluster.contains(node)) { cluster.remove(node); @@ -787,7 +682,7 @@ public class TestReplicationPolicyWithNodeGroup { Host2NodesMap host2DatanodeMap = namenode.getNamesystem() .getBlockManager() .getDatanodeManager().getHost2DatanodeMap(); - for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) { + for(int i=0; i<dataNodesForDependencies.length; i++) { cluster.add(dataNodesForDependencies[i]); host2DatanodeMap.add(dataNodesForDependencies[i]); } @@ -803,7 +698,7 @@ public class TestReplicationPolicyWithNodeGroup { dataNodesForDependencies[3].getHostName()); //Update heartbeat - for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) { + for(int i=0; i<dataNodesForDependencies.length; i++) { updateHeartbeatWithUsage(dataNodesForDependencies[i], 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 2* HdfsServerConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0); @@ -825,8 +720,8 @@ public class TestReplicationPolicyWithNodeGroup { assertTrue(targets[1].equals(storagesForDependencies[3]) || targets[1].equals(storagesForDependencies[4])); //verify that all data nodes are in the excluded list - assertEquals(excludedNodes.size(), NUM_OF_DATANODES_FOR_DEPENDENCIES); - for(int i=0; i<NUM_OF_DATANODES_FOR_DEPENDENCIES; i++) { + assertEquals(excludedNodes.size(), dataNodesForDependencies.length); + for(int i=0; i<dataNodesForDependencies.length; i++) { assertTrue(excludedNodes.contains(dataNodesForDependencies[i])); } }
