HDFS-9354. Fix TestBalancer#testBalancerWithZeroThreadsForMove on Windows.
Contributed by Xiaoyu Yao.
(cherry picked from commit 095ac834022df6136b42961c507ec745c6cf8f97)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9d11d2a8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9d11d2a8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9d11d2a8
Branch: refs/heads/branch-2
Commit: 9d11d2a8f278cdf9af03fe600fc80b93d0de67b7
Parents: 3190446
Author: cnauroth <[email protected]>
Authored: Tue Nov 3 10:51:21 2015 -0800
Committer: cnauroth <[email protected]>
Committed: Tue Nov 3 11:41:50 2015 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/balancer/TestBalancer.java | 573 +++++++++----------
2 files changed, 277 insertions(+), 299 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d11d2a8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 8d2ddf5..6d2b226 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1376,6 +1376,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9313. Possible NullPointerException in BlockManager if no excess
replica can be chosen. (mingma)
+ HDFS-9354. Fix TestBalancer#testBalancerWithZeroThreadsForMove on Windows.
+ (Xiaoyu Yao via cnauroth)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d11d2a8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 095241d..e33e586 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.log4j.Level;
+import org.junit.After;
import org.junit.Test;
/**
@@ -104,6 +105,14 @@ public class TestBalancer {
final static Path filePath = new Path(fileName);
private MiniDFSCluster cluster;
+ @After
+ public void shutdown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
ClientProtocol client;
static final long TIMEOUT = 40000L; //msec
@@ -348,44 +357,38 @@ public class TestBalancer {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
- try {
- cluster.waitActive();
- client = NameNodeProxies.createProxy(conf,
- cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
-
- // fill up the cluster to be 80% full
- long totalCapacity = sum(capacities);
- long totalUsedSpace = totalCapacity * 8 / 10;
- InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
- for (int i = 0; i < favoredNodes.length; i++) {
- // DFSClient will attempt reverse lookup. In case it resolves
- // "127.0.0.1" to "localhost", we manually specify the hostname.
- int port = cluster.getDataNodes().get(i).getXferAddress().getPort();
- favoredNodes[i] = new InetSocketAddress(hosts[i], port);
- }
+ cluster.waitActive();
+ client = NameNodeProxies.createProxy(conf,
+ cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+
+ // fill up the cluster to be 80% full
+ long totalCapacity = sum(capacities);
+ long totalUsedSpace = totalCapacity * 8 / 10;
+ InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
+ for (int i = 0; i < favoredNodes.length; i++) {
+ // DFSClient will attempt reverse lookup. In case it resolves
+ // "127.0.0.1" to "localhost", we manually specify the hostname.
+ int port = cluster.getDataNodes().get(i).getXferAddress().getPort();
+ favoredNodes[i] = new InetSocketAddress(hosts[i], port);
+ }
- DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
- totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
- (short) numOfDatanodes, 0, false, favoredNodes);
-
- // start up an empty node with the same capacity
- cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
- new long[] { CAPACITY });
-
- totalCapacity += CAPACITY;
-
- // run balancer and validate results
- waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
+ DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
+ totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
+ (short) numOfDatanodes, 0, false, favoredNodes);
- // start rebalancing
- Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
- int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
- assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
-
- } finally {
- cluster.shutdown();
- }
-
+ // start up an empty node with the same capacity
+ cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
+ new long[] { CAPACITY });
+
+ totalCapacity += CAPACITY;
+
+ // run balancer and validate results
+ waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
+
+ // start rebalancing
+ Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+ int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
+ assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
}
/**
@@ -569,7 +572,7 @@ public class TestBalancer {
private void doTest(Configuration conf, long[] capacities,
String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
boolean useTool, boolean useFile) throws Exception {
- LOG.info("capacities = " + long2String(capacities));
+ LOG.info("capacities = " + long2String(capacities));
LOG.info("racks = " + Arrays.asList(racks));
LOG.info("newCapacity= " + newCapacity);
LOG.info("newRack = " + newRack);
@@ -587,7 +590,7 @@ public class TestBalancer {
ClientProtocol.class).getProxy();
long totalCapacity = sum(capacities);
-
+
// fill up the cluster to be 30% full
long totalUsedSpace = totalCapacity*3/10;
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
@@ -811,7 +814,7 @@ public class TestBalancer {
/** one-node cluster test*/
private void oneNodeTest(Configuration conf, boolean useTool) throws
Exception {
// add an empty node with half of the CAPACITY & the same rack
- doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2,
+ doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2,
RACK0, useTool);
}
@@ -865,31 +868,27 @@ public class TestBalancer {
.racks(racks)
.simulatedCapacities(capacities)
.build();
- try {
- cluster.waitActive();
- client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(),
- ClientProtocol.class).getProxy();
+ cluster.waitActive();
+ client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(),
+ ClientProtocol.class).getProxy();
- for(int i = 0; i < 3; i++) {
- cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null);
- }
+ for(int i = 0; i < 3; i++) {
+ cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null);
+ }
- cluster.startDataNodes(conf, 1, true, null,
- new String[]{RACK0}, null,new long[]{CAPACITY});
- cluster.triggerHeartbeats();
+ cluster.startDataNodes(conf, 1, true, null,
+ new String[]{RACK0}, null,new long[]{CAPACITY});
+ cluster.triggerHeartbeats();
- Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
- Set<String> datanodes = new HashSet<String>();
-
datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
- BalancerParameters.Builder pBuilder =
- new BalancerParameters.Builder();
- pBuilder.setExcludedNodes(datanodes);
- pBuilder.setRunDuringUpgrade(false);
- final int r = Balancer.run(namenodes, pBuilder.build(), conf);
- assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
- } finally {
- cluster.shutdown();
- }
+ Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+ Set<String> datanodes = new HashSet<String>();
+ datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
+ BalancerParameters.Builder pBuilder =
+ new BalancerParameters.Builder();
+ pBuilder.setExcludedNodes(datanodes);
+ pBuilder.setRunDuringUpgrade(false);
+ final int r = Balancer.run(namenodes, pBuilder.build(), conf);
+ assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
}
/**
@@ -1322,47 +1321,44 @@ public class TestBalancer {
.storageTypes(new StorageType[] { RAM_DISK, DEFAULT })
.build();
- try {
- cluster.waitActive();
- // Create few files on RAM_DISK
- final String METHOD_NAME = GenericTestUtils.getMethodName();
- final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
- final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
-
- DistributedFileSystem fs = cluster.getFileSystem();
- DFSClient client = fs.getClient();
- DFSTestUtil.createFile(fs, path1, true,
- DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
- DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
- DFSTestUtil.createFile(fs, path2, true,
- DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
- DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
-
- // Sleep for a short time to allow the lazy writer thread to do its job
- Thread.sleep(6 * 1000);
-
- // Add another fresh DN with the same type/capacity without files on
RAM_DISK
- StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}};
- long[][] storageCapacities = new long[][]{{ramDiskStorageLimit,
diskStorageLimit}};
- cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null,
- null, null, storageCapacities, null, false, false, false, null);
-
- cluster.triggerHeartbeats();
- Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
-
- // Run Balancer
- final BalancerParameters p = BalancerParameters.DEFAULT;
- final int r = Balancer.run(namenodes, p, conf);
+ cluster.waitActive();
+ // Create few files on RAM_DISK
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+ final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+ DistributedFileSystem fs = cluster.getFileSystem();
+ DFSClient client = fs.getClient();
+ DFSTestUtil.createFile(fs, path1, true,
+ DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
+ DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
+ DFSTestUtil.createFile(fs, path2, true,
+ DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
+ DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
+
+ // Sleep for a short time to allow the lazy writer thread to do its job
+ Thread.sleep(6 * 1000);
+
+ // Add another fresh DN with the same type/capacity without files on
RAM_DISK
+ StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}};
+ long[][] storageCapacities = new long[][]{{ramDiskStorageLimit,
+ diskStorageLimit}};
+ cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null,
+ null, null, storageCapacities, null, false, false, false, null);
+
+ cluster.triggerHeartbeats();
+ Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
- // Validate no RAM_DISK block should be moved
- assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+ // Run Balancer
+ final BalancerParameters p = BalancerParameters.DEFAULT;
+ final int r = Balancer.run(namenodes, p, conf);
- // Verify files are still on RAM_DISK
- DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK);
- DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK);
- } finally {
- cluster.shutdown();
- }
+ // Validate no RAM_DISK block should be moved
+ assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+
+ // Verify files are still on RAM_DISK
+ DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK);
+ DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK);
}
/**
@@ -1386,51 +1382,45 @@ public class TestBalancer {
.storageTypes(new StorageType[] { DEFAULT })
.storagesPerDatanode(1)
.build();
+ cluster.waitActive();
+ // Create a file on the single DN
+ final String METHOD_NAME = GenericTestUtils.getMethodName();
+ final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
- try {
- cluster.waitActive();
- // Create a file on the single DN
- final String METHOD_NAME = GenericTestUtils.getMethodName();
- final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
-
- DistributedFileSystem fs = cluster.getFileSystem();
- DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE,
- (short) 1, SEED);
-
- // Add another DN with the same capacity, cluster is now unbalanced
- cluster.startDataNodes(conf, 1, true, null, null);
- cluster.triggerHeartbeats();
- Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+ DistributedFileSystem fs = cluster.getFileSystem();
+ DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE,
+ (short) 1, SEED);
- // Run balancer
- final BalancerParameters p = BalancerParameters.DEFAULT;
+ // Add another DN with the same capacity, cluster is now unbalanced
+ cluster.startDataNodes(conf, 1, true, null, null);
+ cluster.triggerHeartbeats();
+ Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
- fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
- fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE);
- fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+ // Run balancer
+ final BalancerParameters p = BalancerParameters.DEFAULT;
- // Rolling upgrade should abort the balancer
- assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(),
- Balancer.run(namenodes, p, conf));
+ fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
+ fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE);
+ fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
- // Should work with the -runDuringUpgrade flag.
- BalancerParameters.Builder b =
- new BalancerParameters.Builder();
- b.setRunDuringUpgrade(true);
- final BalancerParameters runDuringUpgrade = b.build();
- assertEquals(ExitStatus.SUCCESS.getExitCode(),
- Balancer.run(namenodes, runDuringUpgrade, conf));
+ // Rolling upgrade should abort the balancer
+ assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(),
+ Balancer.run(namenodes, p, conf));
- // Finalize the rolling upgrade
- fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE);
+ // Should work with the -runDuringUpgrade flag.
+ BalancerParameters.Builder b =
+ new BalancerParameters.Builder();
+ b.setRunDuringUpgrade(true);
+ final BalancerParameters runDuringUpgrade = b.build();
+ assertEquals(ExitStatus.SUCCESS.getExitCode(),
+ Balancer.run(namenodes, runDuringUpgrade, conf));
- // Should also work after finalization.
- assertEquals(ExitStatus.SUCCESS.getExitCode(),
- Balancer.run(namenodes, p, conf));
+ // Finalize the rolling upgrade
+ fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE);
- } finally {
- cluster.shutdown();
- }
+ // Should also work after finalization.
+ assertEquals(ExitStatus.SUCCESS.getExitCode(),
+ Balancer.run(namenodes, p, conf));
}
/**
@@ -1452,7 +1442,7 @@ public class TestBalancer {
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
int numOfDatanodes =2;
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2)
.racks(new String[]{"/default/rack0", "/default/rack0"})
.storagesPerDatanode(2)
@@ -1463,39 +1453,33 @@ public class TestBalancer {
{100 * blockSize, 20 * blockSize},
{20 * blockSize, 100 * blockSize}})
.build();
+ cluster.waitActive();
- try {
- cluster.waitActive();
-
- //set "/bar" directory with ONE_SSD storage policy.
- DistributedFileSystem fs = cluster.getFileSystem();
- Path barDir = new Path("/bar");
- fs.mkdir(barDir,new FsPermission((short)777));
- fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
-
- // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full,
- // and (DN0,SSD) and (DN1,DISK) are about 15% full.
- long fileLen = 30 * blockSize;
- // fooFile has ONE_SSD policy. So
- // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block.
- // (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block.
- Path fooFile = new Path(barDir, "foo");
- createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0);
- // update space info
- cluster.triggerHeartbeats();
-
- BalancerParameters p = BalancerParameters.DEFAULT;
- Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
- final int r = Balancer.run(namenodes, p, conf);
-
- // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
- // already has one. Otherwise DN1 will have 2 replicas.
- // For same reason, no replicas were moved.
- assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+ //set "/bar" directory with ONE_SSD storage policy.
+ DistributedFileSystem fs = cluster.getFileSystem();
+ Path barDir = new Path("/bar");
+ fs.mkdir(barDir,new FsPermission((short)777));
+ fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+
+ // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full,
+ // and (DN0,SSD) and (DN1,DISK) are about 15% full.
+ long fileLen = 30 * blockSize;
+ // fooFile has ONE_SSD policy. So
+ // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block.
+ // (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block.
+ Path fooFile = new Path(barDir, "foo");
+ createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0);
+ // update space info
+ cluster.triggerHeartbeats();
+
+ BalancerParameters p = BalancerParameters.DEFAULT;
+ Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+ final int r = Balancer.run(namenodes, p, conf);
- } finally {
- cluster.shutdown();
- }
+ // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
+ // already has one. Otherwise DN1 will have 2 replicas.
+ // For same reason, no replicas were moved.
+ assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
}
/**
@@ -1526,50 +1510,46 @@ public class TestBalancer {
int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
.racks(racks).simulatedCapacities(capacities).build();
- try {
- cluster.waitActive();
- client = NameNodeProxies.createProxy(conf,
- cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
-
- long totalCapacity = sum(capacities);
-
- // fill up the cluster to be 30% full
- final long totalUsedSpace = totalCapacity * 3 / 10;
- createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
- (short) numOfDatanodes, 0);
- // start up an empty node with the same capacity and on the same rack
- cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
- new long[] { newCapacity });
-
- // Case1: Simulate first balancer by creating 'balancer.id' file. It
- // will keep this file until the balancing operation is completed.
- FileSystem fs = cluster.getFileSystem(0);
- final FSDataOutputStream out = fs
- .create(Balancer.BALANCER_ID_PATH, false);
- out.writeBytes(InetAddress.getLocalHost().getHostName());
- out.hflush();
- assertTrue("'balancer.id' file doesn't exist!",
- fs.exists(Balancer.BALANCER_ID_PATH));
-
- // start second balancer
- final String[] args = { "-policy", "datanode" };
- final Tool tool = new Cli();
- tool.setConf(conf);
- int exitCode = tool.run(args); // start balancing
- assertEquals("Exit status code mismatches",
- ExitStatus.IO_EXCEPTION.getExitCode(), exitCode);
-
- // Case2: Release lease so that another balancer would be able to
- // perform balancing.
- out.close();
- assertTrue("'balancer.id' file doesn't exist!",
- fs.exists(Balancer.BALANCER_ID_PATH));
- exitCode = tool.run(args); // start balancing
- assertEquals("Exit status code mismatches",
- ExitStatus.SUCCESS.getExitCode(), exitCode);
- } finally {
- cluster.shutdown();
- }
+ cluster.waitActive();
+ client = NameNodeProxies.createProxy(conf,
+ cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+
+ long totalCapacity = sum(capacities);
+
+ // fill up the cluster to be 30% full
+ final long totalUsedSpace = totalCapacity * 3 / 10;
+ createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
+ (short) numOfDatanodes, 0);
+ // start up an empty node with the same capacity and on the same rack
+ cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
+ new long[] { newCapacity });
+
+ // Case1: Simulate first balancer by creating 'balancer.id' file. It
+ // will keep this file until the balancing operation is completed.
+ FileSystem fs = cluster.getFileSystem(0);
+ final FSDataOutputStream out = fs
+ .create(Balancer.BALANCER_ID_PATH, false);
+ out.writeBytes(InetAddress.getLocalHost().getHostName());
+ out.hflush();
+ assertTrue("'balancer.id' file doesn't exist!",
+ fs.exists(Balancer.BALANCER_ID_PATH));
+
+ // start second balancer
+ final String[] args = { "-policy", "datanode" };
+ final Tool tool = new Cli();
+ tool.setConf(conf);
+ int exitCode = tool.run(args); // start balancing
+ assertEquals("Exit status code mismatches",
+ ExitStatus.IO_EXCEPTION.getExitCode(), exitCode);
+
+ // Case2: Release lease so that another balancer would be able to
+ // perform balancing.
+ out.close();
+ assertTrue("'balancer.id' file doesn't exist!",
+ fs.exists(Balancer.BALANCER_ID_PATH));
+ exitCode = tool.run(args); // start balancing
+ assertEquals("Exit status code mismatches",
+ ExitStatus.SUCCESS.getExitCode(), exitCode);
}
/** Balancer should not move blocks with size < minBlockSize. */
@@ -1589,102 +1569,97 @@ public class TestBalancer {
.simulatedCapacities(capacities)
.build();
final DistributedFileSystem dfs = cluster.getFileSystem();
+ cluster.waitActive();
+ client = NameNodeProxies.createProxy(conf, dfs.getUri(),
+ ClientProtocol.class).getProxy();
- try {
- cluster.waitActive();
- client = NameNodeProxies.createProxy(conf, dfs.getUri(),
- ClientProtocol.class).getProxy();
-
- // fill up the cluster to be 80% full
- for(int i = 0; i < lengths.length; i++) {
- final long size = lengths[i];
- final Path p = new Path("/file" + i + "_size" + size);
- try(final OutputStream out = dfs.create(p)) {
- for(int j = 0; j < size; j++) {
- out.write(j);
- }
+ // fill up the cluster to be 80% full
+ for(int i = 0; i < lengths.length; i++) {
+ final long size = lengths[i];
+ final Path p = new Path("/file" + i + "_size" + size);
+ try(final OutputStream out = dfs.create(p)) {
+ for(int j = 0; j < size; j++) {
+ out.write(j);
}
}
-
- // start up an empty node with the same capacity
- cluster.startDataNodes(conf, capacities.length, true, null, null,
capacities);
- LOG.info("capacities = " + Arrays.toString(capacities));
- LOG.info("totalUsedSpace= " + totalUsed);
- LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" +
lengths.length);
- waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client,
cluster);
-
- final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
-
- { // run Balancer with min-block-size=50
- BalancerParameters.Builder b =
- new BalancerParameters.Builder();
- b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
- b.setThreshold(1);
- final BalancerParameters p = b.build();
-
- conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
50);
- final int r = Balancer.run(namenodes, p, conf);
- assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
- }
-
- conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+ }
- { // run Balancer with empty nodes as source nodes
- final Set<String> sourceNodes = new HashSet<>();
- final List<DataNode> datanodes = cluster.getDataNodes();
- for(int i = capacities.length; i < datanodes.size(); i++) {
- sourceNodes.add(datanodes.get(i).getDisplayName());
- }
- BalancerParameters.Builder b =
- new BalancerParameters.Builder();
- b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
- b.setThreshold(1);
- b.setSourceNodes(sourceNodes);
- final BalancerParameters p = b.build();
-
- conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
50);
- final int r = Balancer.run(namenodes, p, conf);
- assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
- }
+ // start up an empty node with the same capacity
+ cluster.startDataNodes(conf, capacities.length, true, null, null,
capacities);
+ LOG.info("capacities = " + Arrays.toString(capacities));
+ LOG.info("totalUsedSpace= " + totalUsed);
+ LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" +
lengths.length);
+ waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client,
cluster);
+
+ final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+
+ { // run Balancer with min-block-size=50
+ BalancerParameters.Builder b =
+ new BalancerParameters.Builder();
+ b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
+ b.setThreshold(1);
+ final BalancerParameters p = b.build();
+
+ conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
50);
+ final int r = Balancer.run(namenodes, p, conf);
+ assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+ }
- { // run Balancer with a filled node as a source node
- final Set<String> sourceNodes = new HashSet<>();
- final List<DataNode> datanodes = cluster.getDataNodes();
- sourceNodes.add(datanodes.get(0).getDisplayName());
- BalancerParameters.Builder b =
- new BalancerParameters.Builder();
- b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
- b.setThreshold(1);
- b.setSourceNodes(sourceNodes);
- final BalancerParameters p = b.build();
-
- conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
1);
- final int r = Balancer.run(namenodes, p, conf);
- assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+ conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+
+ { // run Balancer with empty nodes as source nodes
+ final Set<String> sourceNodes = new HashSet<>();
+ final List<DataNode> datanodes = cluster.getDataNodes();
+ for(int i = capacities.length; i < datanodes.size(); i++) {
+ sourceNodes.add(datanodes.get(i).getDisplayName());
}
+ BalancerParameters.Builder b =
+ new BalancerParameters.Builder();
+ b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
+ b.setThreshold(1);
+ b.setSourceNodes(sourceNodes);
+ final BalancerParameters p = b.build();
- { // run Balancer with all filled node as source nodes
- final Set<String> sourceNodes = new HashSet<>();
- final List<DataNode> datanodes = cluster.getDataNodes();
- for(int i = 0; i < capacities.length; i++) {
- sourceNodes.add(datanodes.get(i).getDisplayName());
- }
- BalancerParameters.Builder b =
- new BalancerParameters.Builder();
- b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
- b.setThreshold(1);
- b.setSourceNodes(sourceNodes);
- final BalancerParameters p = b.build();
-
- conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
1);
- final int r = Balancer.run(namenodes, p, conf);
- assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
+ conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
50);
+ final int r = Balancer.run(namenodes, p, conf);
+ assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+ }
+
+ { // run Balancer with a filled node as a source node
+ final Set<String> sourceNodes = new HashSet<>();
+ final List<DataNode> datanodes = cluster.getDataNodes();
+ sourceNodes.add(datanodes.get(0).getDisplayName());
+ BalancerParameters.Builder b =
+ new BalancerParameters.Builder();
+ b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
+ b.setThreshold(1);
+ b.setSourceNodes(sourceNodes);
+ final BalancerParameters p = b.build();
+
+ conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+ final int r = Balancer.run(namenodes, p, conf);
+ assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+ }
+
+ { // run Balancer with all filled node as source nodes
+ final Set<String> sourceNodes = new HashSet<>();
+ final List<DataNode> datanodes = cluster.getDataNodes();
+ for(int i = 0; i < capacities.length; i++) {
+ sourceNodes.add(datanodes.get(i).getDisplayName());
}
- } finally {
- cluster.shutdown();
+ BalancerParameters.Builder b =
+ new BalancerParameters.Builder();
+ b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
+ b.setThreshold(1);
+ b.setSourceNodes(sourceNodes);
+ final BalancerParameters p = b.build();
+
+ conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+ final int r = Balancer.run(namenodes, p, conf);
+ assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
}
}
-
+
/**
* @param args
*/