Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java Tue Aug 19 23:49:39 2014 @@ -158,21 +158,23 @@ public class UpgradeUtilities { FileUtil.fullyDelete(new File(datanodeStorage,"in_use.lock")); } namenodeStorageChecksum = checksumContents(NAME_NODE, - new File(namenodeStorage, "current")); + new File(namenodeStorage, "current"), false); File dnCurDir = new File(datanodeStorage, "current"); - datanodeStorageChecksum = checksumContents(DATA_NODE, dnCurDir); + datanodeStorageChecksum = checksumContents(DATA_NODE, dnCurDir, false); File bpCurDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir), "current"); - blockPoolStorageChecksum = checksumContents(DATA_NODE, bpCurDir); + blockPoolStorageChecksum = checksumContents(DATA_NODE, bpCurDir, false); File bpCurFinalizeDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir), "current/"+DataStorage.STORAGE_DIR_FINALIZED); - blockPoolFinalizedStorageChecksum = checksumContents(DATA_NODE, bpCurFinalizeDir); + blockPoolFinalizedStorageChecksum = checksumContents(DATA_NODE, + bpCurFinalizeDir, true); File bpCurRbwDir = new File(BlockPoolSliceStorage.getBpRoot(bpid, dnCurDir), "current/"+DataStorage.STORAGE_DIR_RBW); - blockPoolRbwStorageChecksum = checksumContents(DATA_NODE, bpCurRbwDir); + blockPoolRbwStorageChecksum = checksumContents(DATA_NODE, bpCurRbwDir, + false); } // Private helper method that writes a file to the given file system. @@ -266,36 +268,47 @@ public class UpgradeUtilities { /** * Compute the checksum of all the files in the specified directory. - * The contents of subdirectories are not included. This method provides - * an easy way to ensure equality between the contents of two directories. + * This method provides an easy way to ensure equality between the contents + * of two directories. * * @param nodeType if DATA_NODE then any file named "VERSION" is ignored. * This is because this file file is changed every time * the Datanode is started. - * @param dir must be a directory. Subdirectories are ignored. + * @param dir must be a directory + * @param recursive whether or not to consider subdirectories * * @throws IllegalArgumentException if specified directory is not a directory * @throws IOException if an IOException occurs while reading the files * @return the computed checksum value */ - public static long checksumContents(NodeType nodeType, File dir) throws IOException { + public static long checksumContents(NodeType nodeType, File dir, + boolean recursive) throws IOException { + CRC32 checksum = new CRC32(); + checksumContentsHelper(nodeType, dir, checksum, recursive); + return checksum.getValue(); + } + + public static void checksumContentsHelper(NodeType nodeType, File dir, + CRC32 checksum, boolean recursive) throws IOException { if (!dir.isDirectory()) { throw new IllegalArgumentException( - "Given argument is not a directory:" + dir); + "Given argument is not a directory:" + dir); } File[] list = dir.listFiles(); Arrays.sort(list); - CRC32 checksum = new CRC32(); for (int i = 0; i < list.length; i++) { if (!list[i].isFile()) { + if (recursive) { + checksumContentsHelper(nodeType, list[i], checksum, recursive); + } continue; } // skip VERSION and dfsUsed file for DataNodes - if (nodeType == DATA_NODE && - (list[i].getName().equals("VERSION") || - list[i].getName().equals("dfsUsed"))) { - continue; + if (nodeType == DATA_NODE && + (list[i].getName().equals("VERSION") || + list[i].getName().equals("dfsUsed"))) { + continue; } FileInputStream fis = null; @@ -312,7 +325,6 @@ public class UpgradeUtilities { } } } - return checksum.getValue(); } /**
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestExtendedBlock.java Tue Aug 19 23:49:39 2014 @@ -49,6 +49,26 @@ public class TestExtendedBlock { new ExtendedBlock(POOL_A, BLOCK_1_GS1), new ExtendedBlock(POOL_A, BLOCK_1_GS2)); } + + @Test + public void testHashcode() { + + // Different pools, same block id -> different hashcode + assertNotEquals( + new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode(), + new ExtendedBlock(POOL_B, BLOCK_1_GS1).hashCode()); + + // Same pool, different block id -> different hashcode + assertNotEquals( + new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode(), + new ExtendedBlock(POOL_A, BLOCK_2_GS1).hashCode()); + + // Same block -> same hashcode + assertEquals( + new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode(), + new ExtendedBlock(POOL_A, BLOCK_1_GS1).hashCode()); + + } private static void assertNotEquals(Object a, Object b) { assertFalse("expected not equal: '" + a + "' and '" + b + "'", Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java Tue Aug 19 23:49:39 2014 @@ -31,25 +31,25 @@ import org.apache.hadoop.fs.permission.A import org.apache.hadoop.fs.permission.AclEntryType; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeStorageProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlocksWithLocationsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CheckpointSignatureProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; +import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeStorageProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; @@ -67,9 +67,18 @@ import org.apache.hadoop.hdfs.server.com import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; -import org.apache.hadoop.hdfs.server.protocol.*; +import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; @@ -175,8 +184,10 @@ public class TestPBHelper { private static BlockWithLocations getBlockWithLocations(int bid) { final String[] datanodeUuids = {"dn1", "dn2", "dn3"}; final String[] storageIDs = {"s1", "s2", "s3"}; + final StorageType[] storageTypes = { + StorageType.DISK, StorageType.DISK, StorageType.DISK}; return new BlockWithLocations(new Block(bid, 0, 1), - datanodeUuids, storageIDs); + datanodeUuids, storageIDs, storageTypes); } private void compare(BlockWithLocations locs1, BlockWithLocations locs2) { @@ -550,8 +561,10 @@ public class TestPBHelper { dnInfos[1][0] = DFSTestUtil.getLocalDatanodeInfo(); dnInfos[1][1] = DFSTestUtil.getLocalDatanodeInfo(); String[][] storageIDs = {{"s00"}, {"s10", "s11"}}; + StorageType[][] storageTypes = {{StorageType.DEFAULT}, + {StorageType.DEFAULT, StorageType.DEFAULT}}; BlockCommand bc = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, "bp1", - blocks, dnInfos, storageIDs); + blocks, dnInfos, storageTypes, storageIDs); BlockCommandProto bcProto = PBHelper.convert(bc); BlockCommand bc2 = PBHelper.convert(bcProto); assertEquals(bc.getAction(), bc2.getAction()); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java Tue Aug 19 23:49:39 2014 @@ -22,8 +22,12 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; import java.io.IOException; +import java.net.BindException; import java.net.URI; +import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; @@ -37,14 +41,13 @@ public class MiniQJMHACluster { private MiniDFSCluster cluster; private MiniJournalCluster journalCluster; private final Configuration conf; + private static final Log LOG = LogFactory.getLog(MiniQJMHACluster.class); public static final String NAMESERVICE = "ns1"; private static final String NN1 = "nn1"; private static final String NN2 = "nn2"; - private static final int NN1_IPC_PORT = 10000; - private static final int NN1_INFO_PORT = 10001; - private static final int NN2_IPC_PORT = 10002; - private static final int NN2_INFO_PORT = 10003; + private static final Random RANDOM = new Random(); + private int basePort = 10000; public static class Builder { private final Configuration conf; @@ -69,51 +72,62 @@ public class MiniQJMHACluster { } } - public static MiniDFSNNTopology createDefaultTopology() { + public static MiniDFSNNTopology createDefaultTopology(int basePort) { return new MiniDFSNNTopology() .addNameservice(new MiniDFSNNTopology.NSConf(NAMESERVICE).addNN( - new MiniDFSNNTopology.NNConf("nn1").setIpcPort(NN1_IPC_PORT) - .setHttpPort(NN1_INFO_PORT)).addNN( - new MiniDFSNNTopology.NNConf("nn2").setIpcPort(NN2_IPC_PORT) - .setHttpPort(NN2_INFO_PORT))); + new MiniDFSNNTopology.NNConf("nn1").setIpcPort(basePort) + .setHttpPort(basePort + 1)).addNN( + new MiniDFSNNTopology.NNConf("nn2").setIpcPort(basePort + 2) + .setHttpPort(basePort + 3))); } - + private MiniQJMHACluster(Builder builder) throws IOException { this.conf = builder.conf; - // start 3 journal nodes - journalCluster = new MiniJournalCluster.Builder(conf).format(true) - .build(); - URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE); - - // start cluster with 2 NameNodes - MiniDFSNNTopology topology = createDefaultTopology(); - - initHAConf(journalURI, builder.conf); - - // First start up the NNs just to format the namespace. The MinIDFSCluster - // has no way to just format the NameNodes without also starting them. - cluster = builder.dfsBuilder.nnTopology(topology) - .manageNameDfsSharedDirs(false).build(); - cluster.waitActive(); - cluster.shutdown(); - - // initialize the journal nodes - Configuration confNN0 = cluster.getConfiguration(0); - NameNode.initializeSharedEdits(confNN0, true); - - cluster.getNameNodeInfos()[0].setStartOpt(builder.startOpt); - cluster.getNameNodeInfos()[1].setStartOpt(builder.startOpt); - - // restart the cluster - cluster.restartNameNodes(); + int retryCount = 0; + while (true) { + try { + basePort = 10000 + RANDOM.nextInt(1000) * 4; + // start 3 journal nodes + journalCluster = new MiniJournalCluster.Builder(conf).format(true) + .build(); + URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE); + + // start cluster with 2 NameNodes + MiniDFSNNTopology topology = createDefaultTopology(basePort); + + initHAConf(journalURI, builder.conf); + + // First start up the NNs just to format the namespace. The MinIDFSCluster + // has no way to just format the NameNodes without also starting them. + cluster = builder.dfsBuilder.nnTopology(topology) + .manageNameDfsSharedDirs(false).build(); + cluster.waitActive(); + cluster.shutdown(); + + // initialize the journal nodes + Configuration confNN0 = cluster.getConfiguration(0); + NameNode.initializeSharedEdits(confNN0, true); + + cluster.getNameNodeInfos()[0].setStartOpt(builder.startOpt); + cluster.getNameNodeInfos()[1].setStartOpt(builder.startOpt); + + // restart the cluster + cluster.restartNameNodes(); + ++retryCount; + break; + } catch (BindException e) { + LOG.info("MiniQJMHACluster port conflicts, retried " + + retryCount + " times"); + } + } } private Configuration initHAConf(URI journalURI, Configuration conf) { conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, journalURI.toString()); - String address1 = "127.0.0.1:" + NN1_IPC_PORT; - String address2 = "127.0.0.1:" + NN2_IPC_PORT; + String address1 = "127.0.0.1:" + basePort; + String address2 = "127.0.0.1:" + (basePort + 2); conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, NAMESERVICE, NN1), address1); conf.set(DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java Tue Aug 19 23:49:39 2014 @@ -21,16 +21,12 @@ import static org.junit.Assert.*; import java.io.File; import java.io.IOException; -import java.net.URL; -import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -41,6 +37,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; + public class TestNNWithQJM { final Configuration conf = new HdfsConfiguration(); private MiniJournalCluster mjc = null; @@ -204,55 +201,4 @@ public class TestNNWithQJM { "Unable to start log segment 1: too few journals", ioe); } } - - @Test (timeout = 30000) - public void testWebPageHasQjmInfo() throws Exception { - conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, - MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image"); - conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, - mjc.getQuorumJournalURI("myjournal").toString()); - // Speed up the test - conf.setInt( - CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); - - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(0) - .manageNameDfsDirs(false) - .build(); - try { - URL url = new URL("http://localhost:" - + NameNode.getHttpAddress(cluster.getConfiguration(0)).getPort() - + "/dfshealth.jsp"); - - cluster.getFileSystem().mkdirs(TEST_PATH); - - String contents = DFSTestUtil.urlGet(url); - assertTrue(contents.contains("QJM to [")); - assertTrue(contents.contains("Written txid 2")); - - // Stop one JN, do another txn, and make sure it shows as behind - // stuck behind the others. - mjc.getJournalNode(0).stopAndJoin(0); - - cluster.getFileSystem().delete(TEST_PATH, true); - - contents = DFSTestUtil.urlGet(url); - System.out.println(contents); - assertTrue(Pattern.compile("1 txns/\\d+ms behind").matcher(contents) - .find()); - - // Restart NN while JN0 is still down. - cluster.restartNameNode(); - - contents = DFSTestUtil.urlGet(url); - System.out.println(contents); - assertTrue(Pattern.compile("never written").matcher(contents) - .find()); - - - } finally { - cluster.shutdown(); - } - - } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java Tue Aug 19 23:49:39 2014 @@ -25,7 +25,7 @@ import java.io.IOException; import java.net.URI; import java.util.List; -import junit.framework.Assert; +import org.junit.Assert; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; @@ -208,7 +208,7 @@ public class TestQuorumJournalManagerUni anyLong(), eq(1L), eq(1), Mockito.<byte[]>any()); // And the third log not respond - SettableFuture<Void> slowLog = SettableFuture.<Void>create(); + SettableFuture<Void> slowLog = SettableFuture.create(); Mockito.doReturn(slowLog).when(spyLoggers.get(2)).sendEdits( anyLong(), eq(1L), eq(1), Mockito.<byte[]>any()); stm.flush(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Tue Aug 19 23:49:39 2014 @@ -170,11 +170,6 @@ public class TestJournalNode { assertTrue("Bad contents: " + pageContents, pageContents.contains( "Hadoop:service=JournalNode,name=JvmMetrics")); - - // Check JSP page. - pageContents = DFSTestUtil.urlGet( - new URL(urlRoot + "/journalstatus.jsp")); - assertTrue(pageContents.contains("JournalNode")); // Create some edits on server side byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, 3); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java Tue Aug 19 23:49:39 2014 @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.web.WebHdf import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; import org.apache.hadoop.security.TestDoAsEffectiveUser; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.DefaultImpersonationProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.token.Token; import org.junit.AfterClass; @@ -88,7 +89,8 @@ public class TestDelegationTokenForProxy builder.append("127.0.1.1,"); builder.append(InetAddress.getLocalHost().getCanonicalHostName()); LOG.info("Local Ip addresses: " + builder.toString()); - conf.setStrings(ProxyUsers.getProxySuperuserIpConfKey(superUserShortName), + conf.setStrings(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserIpConfKey(superUserShortName), builder.toString()); } @@ -100,7 +102,8 @@ public class TestDelegationTokenForProxy DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY, 10000); config.setLong( DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY, 5000); - config.setStrings(ProxyUsers.getProxySuperuserGroupConfKey(REAL_USER), + config.setStrings(DefaultImpersonationProvider.getTestProvider(). + getProxySuperuserGroupConfKey(REAL_USER), "group1"); config.setBoolean( DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Tue Aug 19 23:49:39 2014 @@ -18,17 +18,23 @@ package org.apache.hadoop.hdfs.server.balancer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.File; import java.io.IOException; +import java.io.PrintWriter; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.TimeoutException; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -48,6 +54,8 @@ import org.apache.hadoop.hdfs.protocol.E import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; +import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -81,7 +89,7 @@ public class TestBalancer { private static final Random r = new Random(); static { - Balancer.setBlockMoveWaitTime(1000L) ; + Dispatcher.setBlockMoveWaitTime(1000L) ; } static void initConf(Configuration conf) { @@ -255,6 +263,18 @@ public class TestBalancer { } } } + + /** + * Wait until balanced: each datanode gives utilization within + * BALANCE_ALLOWED_VARIANCE of average + * @throws IOException + * @throws TimeoutException + */ + static void waitForBalancer(long totalUsedSpace, long totalCapacity, + ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p) + throws IOException, TimeoutException { + waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0); + } /** * Wait until balanced: each datanode gives utilization within @@ -263,11 +283,17 @@ public class TestBalancer { * @throws TimeoutException */ static void waitForBalancer(long totalUsedSpace, long totalCapacity, - ClientProtocol client, MiniDFSCluster cluster) - throws IOException, TimeoutException { + ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p, + int expectedExcludedNodes) throws IOException, TimeoutException { long timeout = TIMEOUT; long failtime = (timeout <= 0L) ? Long.MAX_VALUE : Time.now() + timeout; + if (!p.nodesToBeIncluded.isEmpty()) { + totalCapacity = p.nodesToBeIncluded.size() * CAPACITY; + } + if (!p.nodesToBeExcluded.isEmpty()) { + totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY; + } final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; boolean balanced; do { @@ -275,9 +301,20 @@ public class TestBalancer { client.getDatanodeReport(DatanodeReportType.ALL); assertEquals(datanodeReport.length, cluster.getDataNodes().size()); balanced = true; + int actualExcludedNodeCount = 0; for (DatanodeInfo datanode : datanodeReport) { double nodeUtilization = ((double)datanode.getDfsUsed()) / datanode.getCapacity(); + if (Dispatcher.Util.isExcluded(p.nodesToBeExcluded, datanode)) { + assertTrue(nodeUtilization == 0); + actualExcludedNodeCount++; + continue; + } + if (!Dispatcher.Util.isIncluded(p.nodesToBeIncluded, datanode)) { + assertTrue(nodeUtilization == 0); + actualExcludedNodeCount++; + continue; + } if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) { balanced = false; if (Time.now() > failtime) { @@ -294,6 +331,7 @@ public class TestBalancer { break; } } + assertEquals(expectedExcludedNodes,actualExcludedNodeCount); } while (!balanced); } @@ -307,22 +345,118 @@ public class TestBalancer { } return b.append("]").toString(); } - /** This test start a cluster with specified number of nodes, + /** + * Class which contains information about the + * new nodes to be added to the cluster for balancing. + */ + static abstract class NewNodeInfo { + + Set<String> nodesToBeExcluded = new HashSet<String>(); + Set<String> nodesToBeIncluded = new HashSet<String>(); + + abstract String[] getNames(); + abstract int getNumberofNewNodes(); + abstract int getNumberofIncludeNodes(); + abstract int getNumberofExcludeNodes(); + + public Set<String> getNodesToBeIncluded() { + return nodesToBeIncluded; + } + public Set<String> getNodesToBeExcluded() { + return nodesToBeExcluded; + } + } + + /** + * The host names of new nodes are specified + */ + static class HostNameBasedNodes extends NewNodeInfo { + String[] hostnames; + + public HostNameBasedNodes(String[] hostnames, + Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) { + this.hostnames = hostnames; + this.nodesToBeExcluded = nodesToBeExcluded; + this.nodesToBeIncluded = nodesToBeIncluded; + } + + @Override + String[] getNames() { + return hostnames; + } + @Override + int getNumberofNewNodes() { + return hostnames.length; + } + @Override + int getNumberofIncludeNodes() { + return nodesToBeIncluded.size(); + } + @Override + int getNumberofExcludeNodes() { + return nodesToBeExcluded.size(); + } + } + + /** + * The number of data nodes to be started are specified. + * The data nodes will have same host name, but different port numbers. + * + */ + static class PortNumberBasedNodes extends NewNodeInfo { + int newNodes; + int excludeNodes; + int includeNodes; + + public PortNumberBasedNodes(int newNodes, int excludeNodes, int includeNodes) { + this.newNodes = newNodes; + this.excludeNodes = excludeNodes; + this.includeNodes = includeNodes; + } + + @Override + String[] getNames() { + return null; + } + @Override + int getNumberofNewNodes() { + return newNodes; + } + @Override + int getNumberofIncludeNodes() { + return includeNodes; + } + @Override + int getNumberofExcludeNodes() { + return excludeNodes; + } + } + + private void doTest(Configuration conf, long[] capacities, String[] racks, + long newCapacity, String newRack, boolean useTool) throws Exception { + doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false); + } + + /** This test start a cluster with specified number of nodes, * and fills it to be 30% full (with a single file replicated identically * to all datanodes); * It then adds one new empty node and starts balancing. - * + * * @param conf - configuration * @param capacities - array of capacities of original nodes in cluster * @param racks - array of racks for original nodes in cluster * @param newCapacity - new node's capacity * @param newRack - new node's rack + * @param nodes - information about new nodes to be started. * @param useTool - if true run test via Cli with command-line argument * parsing, etc. Otherwise invoke balancer API directly. + * @param useFile - if true, the hosts to included or excluded will be stored in a + * file and then later read from the file. * @throws Exception */ - private void doTest(Configuration conf, long[] capacities, String[] racks, - long newCapacity, String newRack, boolean useTool) throws Exception { + private void doTest(Configuration conf, long[] capacities, + String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, + boolean useTool, boolean useFile) throws Exception { LOG.info("capacities = " + long2String(capacities)); LOG.info("racks = " + Arrays.asList(racks)); LOG.info("newCapacity= " + newCapacity); @@ -346,17 +480,75 @@ public class TestBalancer { long totalUsedSpace = totalCapacity*3/10; createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, (short) numOfDatanodes, 0); - // start up an empty node with the same capacity and on the same rack - cluster.startDataNodes(conf, 1, true, null, - new String[]{newRack}, new long[]{newCapacity}); - totalCapacity += newCapacity; + if (nodes == null) { // there is no specification of new nodes. + // start up an empty node with the same capacity and on the same rack + cluster.startDataNodes(conf, 1, true, null, + new String[]{newRack}, null,new long[]{newCapacity}); + totalCapacity += newCapacity; + } else { + //if running a test with "include list", include original nodes as well + if (nodes.getNumberofIncludeNodes()>0) { + for (DataNode dn: cluster.getDataNodes()) + nodes.getNodesToBeIncluded().add(dn.getDatanodeId().getHostName()); + } + String[] newRacks = new String[nodes.getNumberofNewNodes()]; + long[] newCapacities = new long[nodes.getNumberofNewNodes()]; + for (int i=0; i < nodes.getNumberofNewNodes(); i++) { + newRacks[i] = newRack; + newCapacities[i] = newCapacity; + } + // if host names are specified for the new nodes to be created. + if (nodes.getNames() != null) { + cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null, + newRacks, nodes.getNames(), newCapacities); + totalCapacity += newCapacity*nodes.getNumberofNewNodes(); + } else { // host names are not specified + cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null, + newRacks, null, newCapacities); + totalCapacity += newCapacity*nodes.getNumberofNewNodes(); + //populate the include nodes + if (nodes.getNumberofIncludeNodes() > 0) { + int totalNodes = cluster.getDataNodes().size(); + for (int i=0; i < nodes.getNumberofIncludeNodes(); i++) { + nodes.getNodesToBeIncluded().add (cluster.getDataNodes().get( + totalNodes-1-i).getDatanodeId().getXferAddr()); + } + } + //polulate the exclude nodes + if (nodes.getNumberofExcludeNodes() > 0) { + int totalNodes = cluster.getDataNodes().size(); + for (int i=0; i < nodes.getNumberofExcludeNodes(); i++) { + nodes.getNodesToBeExcluded().add (cluster.getDataNodes().get( + totalNodes-1-i).getDatanodeId().getXferAddr()); + } + } + } + } + // run balancer and validate results + Balancer.Parameters p = Balancer.Parameters.DEFAULT; + if (nodes != null) { + p = new Balancer.Parameters( + Balancer.Parameters.DEFAULT.policy, + Balancer.Parameters.DEFAULT.threshold, + nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded()); + } + + int expectedExcludedNodes = 0; + if (nodes != null) { + if (!nodes.getNodesToBeExcluded().isEmpty()) { + expectedExcludedNodes = nodes.getNodesToBeExcluded().size(); + } else if (!nodes.getNodesToBeIncluded().isEmpty()) { + expectedExcludedNodes = + cluster.getDataNodes().size() - nodes.getNodesToBeIncluded().size(); + } + } // run balancer and validate results if (useTool) { - runBalancerCli(conf, totalUsedSpace, totalCapacity); + runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, expectedExcludedNodes); } else { - runBalancer(conf, totalUsedSpace, totalCapacity); + runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes); } } finally { cluster.shutdown(); @@ -365,31 +557,86 @@ public class TestBalancer { private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity) throws Exception { + runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0); + } + + private void runBalancer(Configuration conf, + long totalUsedSpace, long totalCapacity, Balancer.Parameters p, + int excludedNodes) throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); // start rebalancing Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); - assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); - + final int r = Balancer.run(namenodes, p, conf); + if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) { + assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); + return; + } else { + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); + } waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); LOG.info("Rebalancing with default ctor."); - waitForBalancer(totalUsedSpace, totalCapacity, client, cluster); + waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes); } - + private void runBalancerCli(Configuration conf, - long totalUsedSpace, long totalCapacity) throws Exception { + long totalUsedSpace, long totalCapacity, + Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); + List <String> args = new ArrayList<String>(); + args.add("-policy"); + args.add("datanode"); + + File excludeHostsFile = null; + if (!p.nodesToBeExcluded.isEmpty()) { + args.add("-exclude"); + if (useFile) { + excludeHostsFile = new File ("exclude-hosts-file"); + PrintWriter pw = new PrintWriter(excludeHostsFile); + for (String host: p.nodesToBeExcluded) { + pw.write( host + "\n"); + } + pw.close(); + args.add("-f"); + args.add("exclude-hosts-file"); + } else { + args.add(StringUtils.join(p.nodesToBeExcluded, ',')); + } + } + + File includeHostsFile = null; + if (!p.nodesToBeIncluded.isEmpty()) { + args.add("-include"); + if (useFile) { + includeHostsFile = new File ("include-hosts-file"); + PrintWriter pw = new PrintWriter(includeHostsFile); + for (String host: p.nodesToBeIncluded){ + pw.write( host + "\n"); + } + pw.close(); + args.add("-f"); + args.add("include-hosts-file"); + } else { + args.add(StringUtils.join(p.nodesToBeIncluded, ',')); + } + } - final String[] args = { "-policy", "datanode" }; final Tool tool = new Cli(); tool.setConf(conf); - final int r = tool.run(args); // start rebalancing + final int r = tool.run(args.toArray(new String[0])); // start rebalancing assertEquals("Tools should exit 0 on success", 0, r); waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); LOG.info("Rebalancing with default ctor."); - waitForBalancer(totalUsedSpace, totalCapacity, client, cluster); + waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes); + + if (excludeHostsFile != null && excludeHostsFile.exists()) { + excludeHostsFile.delete(); + } + if (includeHostsFile != null && includeHostsFile.exists()) { + includeHostsFile.delete(); + } } /** one-node cluster test*/ @@ -411,6 +658,71 @@ public class TestBalancer { oneNodeTest(conf, false); } + /* we first start a cluster and fill the cluster up to a certain size. + * then redistribute blocks according the required distribution. + * Then we start an empty datanode. + * Afterwards a balancer is run to balance the cluster. + * A partially filled datanode is excluded during balancing. + * This triggers a situation where one of the block's location is unknown. + */ + @Test(timeout=100000) + public void testUnknownDatanode() throws Exception { + Configuration conf = new HdfsConfiguration(); + initConf(conf); + long distribution[] = new long[] {50*CAPACITY/100, 70*CAPACITY/100, 0*CAPACITY/100}; + long capacities[] = new long[]{CAPACITY, CAPACITY, CAPACITY}; + String racks[] = new String[] {RACK0, RACK1, RACK1}; + + int numDatanodes = distribution.length; + if (capacities.length != numDatanodes || racks.length != numDatanodes) { + throw new IllegalArgumentException("Array length is not the same"); + } + + // calculate total space that need to be filled + final long totalUsedSpace = sum(distribution); + + // fill the cluster + ExtendedBlock[] blocks = generateBlocks(conf, totalUsedSpace, + (short) numDatanodes); + + // redistribute blocks + Block[][] blocksDN = distributeBlocks( + blocks, (short)(numDatanodes-1), distribution); + + // restart the cluster: do NOT format the cluster + conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f"); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes) + .format(false) + .racks(racks) + .simulatedCapacities(capacities) + .build(); + try { + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), + ClientProtocol.class).getProxy(); + + for(int i = 0; i < 3; i++) { + cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null); + } + + cluster.startDataNodes(conf, 1, true, null, + new String[]{RACK0}, null,new long[]{CAPACITY}); + cluster.triggerHeartbeats(); + + Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); + Set<String> datanodes = new HashSet<String>(); + datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName()); + Balancer.Parameters p = new Balancer.Parameters( + Balancer.Parameters.DEFAULT.policy, + Balancer.Parameters.DEFAULT.threshold, + datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded); + final int r = Balancer.run(namenodes, p, conf); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); + } finally { + cluster.shutdown(); + } + } + /** * Test parse method in Balancer#Cli class with threshold value out of * boundaries. @@ -435,7 +747,7 @@ public class TestBalancer { } } - /** Test a cluster with even distribution, + /** Test a cluster with even distribution, * then a new empty node is added to the cluster*/ @Test(timeout=100000) public void testBalancer0() throws Exception { @@ -463,6 +775,20 @@ public class TestBalancer { } @Test(timeout=100000) + public void testBalancerWithZeroThreadsForMove() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0); + testBalancer1Internal (conf); + } + + @Test(timeout=100000) + public void testBalancerWithNonZeroThreadsForMove() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 8); + testBalancer1Internal (conf); + } + + @Test(timeout=100000) public void testBalancer2() throws Exception { testBalancer2Internal(new HdfsConfiguration()); } @@ -528,14 +854,49 @@ public class TestBalancer { } catch (IllegalArgumentException e) { } - parameters = new String[] { "-threshold 1 -policy" }; + parameters = new String[] {"-threshold", "1", "-policy"}; + try { + Balancer.Cli.parse(parameters); + fail(reason); + } catch (IllegalArgumentException e) { + + } + parameters = new String[] {"-threshold", "1", "-include"}; try { Balancer.Cli.parse(parameters); fail(reason); } catch (IllegalArgumentException e) { } + parameters = new String[] {"-threshold", "1", "-exclude"}; + try { + Balancer.Cli.parse(parameters); + fail(reason); + } catch (IllegalArgumentException e) { + } + parameters = new String[] {"-include", "-f"}; + try { + Balancer.Cli.parse(parameters); + fail(reason); + } catch (IllegalArgumentException e) { + + } + parameters = new String[] {"-exclude", "-f"}; + try { + Balancer.Cli.parse(parameters); + fail(reason); + } catch (IllegalArgumentException e) { + + } + + parameters = new String[] {"-include", "testnode1", "-exclude", "testnode2"}; + try { + Balancer.Cli.parse(parameters); + fail("IllegalArgumentException is expected when both -exclude and -include are specified"); + } catch (IllegalArgumentException e) { + + } } /** @@ -551,6 +912,183 @@ public class TestBalancer { } /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the exclude list + */ + @Test(timeout=100000) + public void testBalancerWithExcludeList() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set<String> excludeHosts = new HashSet<String>(); + excludeHosts.add( "datanodeY"); + excludeHosts.add( "datanodeZ"); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, + new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, + excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), false, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the exclude list + */ + @Test(timeout=100000) + public void testBalancerWithExcludeListWithPorts() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), false, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the exclude list + */ + @Test(timeout=100000) + public void testBalancerCliWithExcludeList() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set<String> excludeHosts = new HashSet<String>(); + excludeHosts.add( "datanodeY"); + excludeHosts.add( "datanodeZ"); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, + new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts, + Parameters.DEFAULT.nodesToBeIncluded), true, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the exclude list + */ + @Test(timeout=100000) + public void testBalancerCliWithExcludeListWithPorts() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the exclude list in a file + */ + @Test(timeout=100000) + public void testBalancerCliWithExcludeListInAFile() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set<String> excludeHosts = new HashSet<String>(); + excludeHosts.add( "datanodeY"); + excludeHosts.add( "datanodeZ"); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, + new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, + excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, true); + } + + /** + * Test a cluster with even distribution,G + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the exclude list + */ + @Test(timeout=100000) + public void testBalancerCliWithExcludeListWithPortsInAFile() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, true); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the include list + */ + @Test(timeout=100000) + public void testBalancerWithIncludeList() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set<String> includeHosts = new HashSet<String>(); + includeHosts.add( "datanodeY"); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, + new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, + Parameters.DEFAULT.nodesToBeExcluded, includeHosts), false, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the include list + */ + @Test(timeout=100000) + public void testBalancerWithIncludeListWithPorts() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), false, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the include list + */ + @Test(timeout=100000) + public void testBalancerCliWithIncludeList() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set<String> includeHosts = new HashSet<String>(); + includeHosts.add( "datanodeY"); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, + new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, + Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the include list + */ + @Test(timeout=100000) + public void testBalancerCliWithIncludeListWithPorts() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the include list + */ + @Test(timeout=100000) + public void testBalancerCliWithIncludeListInAFile() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set<String> includeHosts = new HashSet<String>(); + includeHosts.add( "datanodeY"); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, + new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, + Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, true); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the include list + */ + @Test(timeout=100000) + public void testBalancerCliWithIncludeListWithPortsInAFile() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true); + } + + /** * @param args */ public static void main(String[] args) throws Exception { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java Tue Aug 19 23:49:39 2014 @@ -44,7 +44,7 @@ public class TestBalancerWithHANameNodes ClientProtocol client; static { - Balancer.setBlockMoveWaitTime(1000L); + Dispatcher.setBlockMoveWaitTime(1000L); } /** @@ -97,10 +97,10 @@ public class TestBalancerWithHANameNodes Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); assertEquals(1, namenodes.size()); assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster))); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); - assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, - cluster); + cluster, Balancer.Parameters.DEFAULT); } finally { cluster.shutdown(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java Tue Aug 19 23:49:39 2014 @@ -73,7 +73,7 @@ public class TestBalancerWithMultipleNam private static final Random RANDOM = new Random(); static { - Balancer.setBlockMoveWaitTime(1000L) ; + Dispatcher.setBlockMoveWaitTime(1000L) ; } /** Common objects used in various methods. */ @@ -159,8 +159,8 @@ public class TestBalancerWithMultipleNam // start rebalancing final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf); - Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf); + Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r); LOG.info("BALANCER 2"); wait(s.clients, totalUsed, totalCapacity); @@ -195,7 +195,7 @@ public class TestBalancerWithMultipleNam balanced = true; for(int d = 0; d < used.length; d++) { final double p = used[d]*100.0/cap[d]; - balanced = p <= avg + Balancer.Parameters.DEFALUT.threshold; + balanced = p <= avg + Balancer.Parameters.DEFAULT.threshold; if (!balanced) { if (i % 100 == 0) { LOG.warn("datanodes " + d + " is not yet balanced: " Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Tue Aug 19 23:49:39 2014 @@ -22,8 +22,9 @@ import static org.junit.Assert.assertEqu import java.io.IOException; import java.net.URI; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; @@ -39,6 +40,9 @@ import org.apache.hadoop.hdfs.MiniDFSClu import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup; import org.apache.hadoop.net.NetworkTopology; @@ -53,7 +57,7 @@ public class TestBalancerWithNodeGroup { private static final Log LOG = LogFactory.getLog( "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup"); - final private static long CAPACITY = 6000L; + final private static long CAPACITY = 5000L; final private static String RACK0 = "/rack0"; final private static String RACK1 = "/rack1"; final private static String NODEGROUP0 = "/nodegroup0"; @@ -71,12 +75,13 @@ public class TestBalancerWithNodeGroup { static final int DEFAULT_BLOCK_SIZE = 100; static { - Balancer.setBlockMoveWaitTime(1000L) ; + Dispatcher.setBlockMoveWaitTime(1000L) ; } static Configuration createConf() { Configuration conf = new HdfsConfiguration(); TestBalancer.initConf(conf); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, NetworkTopologyWithNodeGroup.class.getName()); conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, @@ -170,8 +175,8 @@ public class TestBalancerWithNodeGroup { // start rebalancing Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); - assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); waitForHeartBeat(totalUsedSpace, totalCapacity); LOG.info("Rebalancing with default factor."); @@ -184,13 +189,26 @@ public class TestBalancerWithNodeGroup { // start rebalancing Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); - Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code || - (r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code)); + final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); + Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() || + (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode())); waitForHeartBeat(totalUsedSpace, totalCapacity); LOG.info("Rebalancing with default factor."); } + private Set<ExtendedBlock> getBlocksOnRack(List<LocatedBlock> blks, String rack) { + Set<ExtendedBlock> ret = new HashSet<ExtendedBlock>(); + for (LocatedBlock blk : blks) { + for (DatanodeInfo di : blk.getLocations()) { + if (rack.equals(NetworkTopology.getFirstHalf(di.getNetworkLocation()))) { + ret.add(blk.getBlock()); + break; + } + } + } + return ret; + } + /** * Create a cluster with even distribution, and a new empty node is added to * the cluster, then test rack locality for balancer policy. @@ -220,9 +238,14 @@ public class TestBalancerWithNodeGroup { // fill up the cluster to be 30% full long totalUsedSpace = totalCapacity * 3 / 10; - TestBalancer.createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, + long length = totalUsedSpace / numOfDatanodes; + TestBalancer.createFile(cluster, filePath, length, (short) numOfDatanodes, 0); + LocatedBlocks lbs = client.getBlockLocations(filePath.toUri().getPath(), 0, + length); + Set<ExtendedBlock> before = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0); + long newCapacity = CAPACITY; String newRack = RACK1; String newNodeGroup = NODEGROUP2; @@ -235,22 +258,9 @@ public class TestBalancerWithNodeGroup { // run balancer and validate results runBalancerCanFinish(conf, totalUsedSpace, totalCapacity); - DatanodeInfo[] datanodeReport = - client.getDatanodeReport(DatanodeReportType.ALL); - - Map<String, Integer> rackToUsedCapacity = new HashMap<String, Integer>(); - for (DatanodeInfo datanode: datanodeReport) { - String rack = NetworkTopology.getFirstHalf(datanode.getNetworkLocation()); - int usedCapacity = (int) datanode.getDfsUsed(); - - if (rackToUsedCapacity.get(rack) != null) { - rackToUsedCapacity.put(rack, usedCapacity + rackToUsedCapacity.get(rack)); - } else { - rackToUsedCapacity.put(rack, usedCapacity); - } - } - assertEquals(rackToUsedCapacity.size(), 2); - assertEquals(rackToUsedCapacity.get(RACK0), rackToUsedCapacity.get(RACK1)); + lbs = client.getBlockLocations(filePath.toUri().getPath(), 0, length); + Set<ExtendedBlock> after = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0); + assertEquals(before, after); } finally { cluster.shutdown(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Tue Aug 19 23:49:39 2014 @@ -101,7 +101,6 @@ public class BlockManagerTestUtil { } /** - * @param blockManager * @return replication monitor thread instance from block manager. */ public static Daemon getReplicationThread(final BlockManager blockManager) @@ -111,7 +110,6 @@ public class BlockManagerTestUtil { /** * Stop the replication monitor thread - * @param blockManager */ public static void stopReplicationThread(final BlockManager blockManager) throws IOException { @@ -126,7 +124,6 @@ public class BlockManagerTestUtil { } /** - * @param blockManager * @return corruptReplicas from block manager */ public static CorruptReplicasMap getCorruptReplicas(final BlockManager blockManager){ @@ -135,7 +132,6 @@ public class BlockManagerTestUtil { } /** - * @param blockManager * @return computed block replication and block invalidation work that can be * scheduled on data-nodes. * @throws IOException @@ -158,7 +154,7 @@ public class BlockManagerTestUtil { * regardless of invalidation/replication limit configurations. * * NB: you may want to set - * {@link DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} to + * {@link DFSConfigKeys#DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} to * a high value to ensure that all work is calculated. */ public static int computeAllPendingWork(BlockManager bm) { @@ -200,7 +196,7 @@ public class BlockManagerTestUtil { /** * Change whether the block placement policy will prefer the writer's * local Datanode or not. - * @param prefer + * @param prefer if true, prefer local node */ public static void setWritingPrefersLocalNode( BlockManager bm, boolean prefer) { @@ -240,8 +236,13 @@ public class BlockManagerTestUtil { public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, String rackLocation, DatanodeStorage storage) { + return getDatanodeDescriptor(ipAddr, rackLocation, storage, "host"); + } + + public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr, + String rackLocation, DatanodeStorage storage, String hostname) { DatanodeDescriptor dn = DFSTestUtil.getDatanodeDescriptor(ipAddr, - DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation); + DFSConfigKeys.DFS_DATANODE_DEFAULT_PORT, rackLocation, hostname); if (storage != null) { dn.updateStorage(storage); } @@ -267,4 +268,14 @@ public class BlockManagerTestUtil { } return reports.toArray(StorageReport.EMPTY_ARRAY); } + + /** + * Have DatanodeManager check decommission state. + * @param dm the DatanodeManager to manipulate + */ + public static void checkDecommissionState(DatanodeManager dm, + DatanodeDescriptor node) { + dm.checkDecommissionState(node); + } + } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java Tue Aug 19 23:49:39 2014 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -29,6 +30,8 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.junit.Assert; import org.junit.Test; /** @@ -42,6 +45,41 @@ public class TestBlockInfo { private static final Log LOG = LogFactory .getLog("org.apache.hadoop.hdfs.TestBlockInfo"); + + @Test + public void testAddStorage() throws Exception { + BlockInfo blockInfo = new BlockInfo(3); + + final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1"); + + boolean added = blockInfo.addStorage(storage); + + Assert.assertTrue(added); + Assert.assertEquals(storage, blockInfo.getStorageInfo(0)); + } + + + @Test + public void testReplaceStorage() throws Exception { + + // Create two dummy storages. + final DatanodeStorageInfo storage1 = DFSTestUtil.createDatanodeStorageInfo("storageID1", "127.0.0.1"); + final DatanodeStorageInfo storage2 = new DatanodeStorageInfo(storage1.getDatanodeDescriptor(), new DatanodeStorage("storageID2")); + final int NUM_BLOCKS = 10; + BlockInfo[] blockInfos = new BlockInfo[NUM_BLOCKS]; + + // Create a few dummy blocks and add them to the first storage. + for (int i = 0; i < NUM_BLOCKS; ++i) { + blockInfos[i] = new BlockInfo(3); + storage1.addBlock(blockInfos[i]); + } + + // Try to move one of the blocks to a different storage. + boolean added = storage2.addBlock(blockInfos[NUM_BLOCKS/2]); + Assert.assertThat(added, is(false)); + Assert.assertThat(blockInfos[NUM_BLOCKS/2].getStorageInfo(0), is(storage2)); + } + @Test public void testBlockListMoveToHead() throws Exception { LOG.info("BlockInfo moveToHead tests..."); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Tue Aug 19 23:49:39 2014 @@ -368,7 +368,7 @@ public class TestBlockManager { DatanodeStorageInfo[] pipeline) throws IOException { for (int i = 1; i < pipeline.length; i++) { DatanodeStorageInfo storage = pipeline[i]; - bm.addBlock(storage.getDatanodeDescriptor(), storage.getStorageID(), blockInfo, null); + bm.addBlock(storage, blockInfo, null); blockInfo.addStorage(storage); } } @@ -549,12 +549,12 @@ public class TestBlockManager { // send block report, should be processed reset(node); - bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", + bm.processReport(node, new DatanodeStorage(ds.getStorageID()), new BlockListAsLongs(null, null)); assertEquals(1, ds.getBlockReportCount()); // send block report again, should NOT be processed reset(node); - bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", + bm.processReport(node, new DatanodeStorage(ds.getStorageID()), new BlockListAsLongs(null, null)); assertEquals(1, ds.getBlockReportCount()); @@ -566,7 +566,7 @@ public class TestBlockManager { assertEquals(0, ds.getBlockReportCount()); // ready for report again // send block report, should be processed after restart reset(node); - bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", + bm.processReport(node, new DatanodeStorage(ds.getStorageID()), new BlockListAsLongs(null, null)); assertEquals(1, ds.getBlockReportCount()); } @@ -595,7 +595,7 @@ public class TestBlockManager { // send block report while pretending to already have blocks reset(node); doReturn(1).when(node).numBlocks(); - bm.processReport(node, new DatanodeStorage(ds.getStorageID()), "pool", + bm.processReport(node, new DatanodeStorage(ds.getStorageID()), new BlockListAsLongs(null, null)); assertEquals(1, ds.getBlockReportCount()); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java Tue Aug 19 23:49:39 2014 @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu import org.apache.hadoop.hdfs.RemotePeerFactory; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -160,7 +161,8 @@ public class TestBlockTokenWithDFS { setConfiguration(conf). setRemotePeerFactory(new RemotePeerFactory() { @Override - public Peer newConnectedPeer(InetSocketAddress addr) + public Peer newConnectedPeer(InetSocketAddress addr, + Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) throws IOException { Peer peer = null; Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket(); @@ -209,6 +211,8 @@ public class TestBlockTokenWithDFS { conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, numDataNodes); conf.setInt("ipc.client.connect.max.retries", 0); + // Set short retry timeouts so this test runs faster + conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10); return conf; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCorruptReplicaInfo.java Tue Aug 19 23:49:39 2014 @@ -33,6 +33,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; import org.junit.Test; @@ -89,14 +90,14 @@ public class TestCorruptReplicaInfo { DatanodeDescriptor dn1 = DFSTestUtil.getLocalDatanodeDescriptor(); DatanodeDescriptor dn2 = DFSTestUtil.getLocalDatanodeDescriptor(); - crm.addToCorruptReplicasMap(getBlock(0), dn1, "TEST"); + addToCorruptReplicasMap(crm, getBlock(0), dn1); assertEquals("Number of corrupt blocks not returning correctly", 1, crm.size()); - crm.addToCorruptReplicasMap(getBlock(1), dn1, "TEST"); + addToCorruptReplicasMap(crm, getBlock(1), dn1); assertEquals("Number of corrupt blocks not returning correctly", 2, crm.size()); - crm.addToCorruptReplicasMap(getBlock(1), dn2, "TEST"); + addToCorruptReplicasMap(crm, getBlock(1), dn2); assertEquals("Number of corrupt blocks not returning correctly", 2, crm.size()); @@ -109,7 +110,7 @@ public class TestCorruptReplicaInfo { 0, crm.size()); for (Long block_id: block_ids) { - crm.addToCorruptReplicasMap(getBlock(block_id), dn1, "TEST"); + addToCorruptReplicasMap(crm, getBlock(block_id), dn1); } assertEquals("Number of corrupt blocks not returning correctly", @@ -127,4 +128,9 @@ public class TestCorruptReplicaInfo { crm.getCorruptReplicaBlockIds(10, 7L))); } + + private static void addToCorruptReplicasMap(CorruptReplicasMap crm, + Block blk, DatanodeDescriptor dn) { + crm.addToCorruptReplicasMap(blk, dn, "TEST", Reason.NONE); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java Tue Aug 19 23:49:39 2014 @@ -63,16 +63,16 @@ public class TestDatanodeDescriptor { assertTrue(storages.length > 0); final String storageID = storages[0].getStorageID(); // add first block - assertTrue(dd.addBlock(storageID, blk)); + assertTrue(storages[0].addBlock(blk)); assertEquals(1, dd.numBlocks()); // remove a non-existent block assertFalse(dd.removeBlock(blk1)); assertEquals(1, dd.numBlocks()); // add an existent block - assertFalse(dd.addBlock(storageID, blk)); + assertFalse(storages[0].addBlock(blk)); assertEquals(1, dd.numBlocks()); // add second block - assertTrue(dd.addBlock(storageID, blk1)); + assertTrue(storages[0].addBlock(blk1)); assertEquals(2, dd.numBlocks()); // remove first block assertTrue(dd.removeBlock(blk)); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingDataNodeMessages.java Tue Aug 19 23:49:39 2014 @@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.DFSTestUti import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.junit.Test; import com.google.common.base.Joiner; @@ -43,8 +44,10 @@ public class TestPendingDataNodeMessages @Test public void testQueues() { DatanodeDescriptor fakeDN = DFSTestUtil.getLocalDatanodeDescriptor(); - msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs1, ReplicaState.FINALIZED); - msgs.enqueueReportedBlock(fakeDN, "STORAGE_ID", block1Gs2, ReplicaState.FINALIZED); + DatanodeStorage storage = new DatanodeStorage("STORAGE_ID"); + DatanodeStorageInfo storageInfo = new DatanodeStorageInfo(fakeDN, storage); + msgs.enqueueReportedBlock(storageInfo, block1Gs1, ReplicaState.FINALIZED); + msgs.enqueueReportedBlock(storageInfo, block1Gs2, ReplicaState.FINALIZED); assertEquals(2, msgs.count()); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Tue Aug 19 23:49:39 2014 @@ -82,7 +82,7 @@ public class TestReplicationPolicy { private static NameNode namenode; private static BlockPlacementPolicy replicator; private static final String filename = "/dummyfile.txt"; - private static DatanodeDescriptor dataNodes[]; + private static DatanodeDescriptor[] dataNodes; private static DatanodeStorageInfo[] storages; // The interval for marking a datanode as stale, private static final long staleInterval = @@ -905,49 +905,46 @@ public class TestReplicationPolicy { */ @Test public void testChooseReplicaToDelete() throws Exception { - List<DatanodeDescriptor> replicaNodeList = new - ArrayList<DatanodeDescriptor>(); - final Map<String, List<DatanodeDescriptor>> rackMap - = new HashMap<String, List<DatanodeDescriptor>>(); + List<DatanodeStorageInfo> replicaList = new ArrayList<DatanodeStorageInfo>(); + final Map<String, List<DatanodeStorageInfo>> rackMap + = new HashMap<String, List<DatanodeStorageInfo>>(); dataNodes[0].setRemaining(4*1024*1024); - replicaNodeList.add(dataNodes[0]); + replicaList.add(storages[0]); dataNodes[1].setRemaining(3*1024*1024); - replicaNodeList.add(dataNodes[1]); + replicaList.add(storages[1]); dataNodes[2].setRemaining(2*1024*1024); - replicaNodeList.add(dataNodes[2]); + replicaList.add(storages[2]); dataNodes[5].setRemaining(1*1024*1024); - replicaNodeList.add(dataNodes[5]); + replicaList.add(storages[5]); // Refresh the last update time for all the datanodes for (int i = 0; i < dataNodes.length; i++) { dataNodes[i].setLastUpdate(Time.now()); } - List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>(); - List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>(); - replicator.splitNodesWithRack( - replicaNodeList, rackMap, first, second); - // dataNodes[0] and dataNodes[1] are in first set as their rack has two - // replica nodes, while datanodes[2] and dataNodes[5] are in second set. + List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>(); + List<DatanodeStorageInfo> second = new ArrayList<DatanodeStorageInfo>(); + replicator.splitNodesWithRack(replicaList, rackMap, first, second); + // storages[0] and storages[1] are in first set as their rack has two + // replica nodes, while storages[2] and dataNodes[5] are in second set. assertEquals(2, first.size()); assertEquals(2, second.size()); - DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete( + DatanodeStorageInfo chosen = replicator.chooseReplicaToDelete( null, null, (short)3, first, second); - // Within first set, dataNodes[1] with less free space - assertEquals(chosenNode, dataNodes[1]); + // Within first set, storages[1] with less free space + assertEquals(chosen, storages[1]); - replicator.adjustSetsWithChosenReplica( - rackMap, first, second, chosenNode); + replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosen); assertEquals(0, first.size()); assertEquals(3, second.size()); - // Within second set, dataNodes[5] with less free space - chosenNode = replicator.chooseReplicaToDelete( + // Within second set, storages[5] with less free space + chosen = replicator.chooseReplicaToDelete( null, null, (short)2, first, second); - assertEquals(chosenNode, dataNodes[5]); + assertEquals(chosen, storages[5]); } /** @@ -1121,8 +1118,7 @@ public class TestReplicationPolicy { // Adding this block will increase its current replication, and that will // remove it from the queue. bm.addStoredBlockUnderConstruction(new StatefulBlockInfo(info, info, - ReplicaState.FINALIZED), TestReplicationPolicy.dataNodes[0], - "STORAGE"); + ReplicaState.FINALIZED), TestReplicationPolicy.storages[0]); // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED.