HDFS-9354. Fix TestBalancer#testBalancerWithZeroThreadsForMove on Windows. 
Contributed by Xiaoyu Yao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/095ac834
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/095ac834
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/095ac834

Branch: refs/heads/HDFS-7240
Commit: 095ac834022df6136b42961c507ec745c6cf8f97
Parents: 0783184
Author: cnauroth <cnaur...@apache.org>
Authored: Tue Nov 3 10:51:21 2015 -0800
Committer: cnauroth <cnaur...@apache.org>
Committed: Tue Nov 3 11:21:08 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/balancer/TestBalancer.java      | 572 +++++++++----------
 2 files changed, 277 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/095ac834/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 879c015..1729b73 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -2219,6 +2219,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9313. Possible NullPointerException in BlockManager if no excess
     replica can be chosen. (mingma)
 
+    HDFS-9354. Fix TestBalancer#testBalancerWithZeroThreadsForMove on Windows.
+    (Xiaoyu Yao via cnauroth)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/095ac834/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 332ae15..dd54345 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
 import org.apache.log4j.Level;
+import org.junit.After;
 import org.junit.Test;
 
 /**
@@ -106,6 +107,14 @@ public class TestBalancer {
   final static Path filePath = new Path(fileName);
   private MiniDFSCluster cluster;
 
+  @After
+  public void shutdown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
   ClientProtocol client;
 
   static final long TIMEOUT = 40000L; //msec
@@ -367,44 +376,38 @@ public class TestBalancer {
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
         .hosts(hosts).racks(racks).simulatedCapacities(capacities).build();
 
-    try {
-      cluster.waitActive();
-      client = NameNodeProxies.createProxy(conf,
-          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
-      
-      // fill up the cluster to be 80% full
-      long totalCapacity = sum(capacities);
-      long totalUsedSpace = totalCapacity * 8 / 10;
-      InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
-      for (int i = 0; i < favoredNodes.length; i++) {
-        // DFSClient will attempt reverse lookup. In case it resolves
-        // "127.0.0.1" to "localhost", we manually specify the hostname.
-        int port = cluster.getDataNodes().get(i).getXferAddress().getPort();
-        favoredNodes[i] = new InetSocketAddress(hosts[i], port);
-      }
+    cluster.waitActive();
+    client = NameNodeProxies.createProxy(conf,
+        cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+
+    // fill up the cluster to be 80% full
+    long totalCapacity = sum(capacities);
+    long totalUsedSpace = totalCapacity * 8 / 10;
+    InetSocketAddress[] favoredNodes = new InetSocketAddress[numOfDatanodes];
+    for (int i = 0; i < favoredNodes.length; i++) {
+      // DFSClient will attempt reverse lookup. In case it resolves
+      // "127.0.0.1" to "localhost", we manually specify the hostname.
+      int port = cluster.getDataNodes().get(i).getXferAddress().getPort();
+      favoredNodes[i] = new InetSocketAddress(hosts[i], port);
+    }
 
-      DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
-          totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
-          (short) numOfDatanodes, 0, false, favoredNodes);
-      
-      // start up an empty node with the same capacity
-      cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
-          new long[] { CAPACITY });
-      
-      totalCapacity += CAPACITY;
-      
-      // run balancer and validate results
-      waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
+    DFSTestUtil.createFile(cluster.getFileSystem(0), filePath, false, 1024,
+        totalUsedSpace / numOfDatanodes, DEFAULT_BLOCK_SIZE,
+        (short) numOfDatanodes, 0, false, favoredNodes);
 
-      // start rebalancing
-      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
-      int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
-      assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
-      
-    } finally {
-      cluster.shutdown();
-    }
-    
+    // start up an empty node with the same capacity
+    cluster.startDataNodes(conf, 1, true, null, new String[] { RACK2 },
+        new long[] { CAPACITY });
+
+    totalCapacity += CAPACITY;
+
+    // run balancer and validate results
+    waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
+
+    // start rebalancing
+    Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+    int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
+    assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
   }
   
   /**
@@ -588,7 +591,7 @@ public class TestBalancer {
   private void doTest(Configuration conf, long[] capacities,
       String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
       boolean useTool, boolean useFile) throws Exception {
-    LOG.info("capacities = " +  long2String(capacities)); 
+    LOG.info("capacities = " +  long2String(capacities));
     LOG.info("racks      = " +  Arrays.asList(racks)); 
     LOG.info("newCapacity= " +  newCapacity); 
     LOG.info("newRack    = " +  newRack); 
@@ -606,7 +609,7 @@ public class TestBalancer {
           ClientProtocol.class).getProxy();
 
       long totalCapacity = sum(capacities);
-      
+
       // fill up the cluster to be 30% full
       long totalUsedSpace = totalCapacity*3/10;
       createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
@@ -830,7 +833,7 @@ public class TestBalancer {
   /** one-node cluster test*/
   private void oneNodeTest(Configuration conf, boolean useTool) throws 
Exception {
     // add an empty node with half of the CAPACITY & the same rack
-    doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, 
+    doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2,
             RACK0, useTool);
   }
   
@@ -884,31 +887,27 @@ public class TestBalancer {
         .racks(racks)
         .simulatedCapacities(capacities)
         .build();
-    try {
-      cluster.waitActive();
-      client = NameNodeProxies.createProxy(conf, 
cluster.getFileSystem(0).getUri(),
-          ClientProtocol.class).getProxy();
+    cluster.waitActive();
+    client = NameNodeProxies.createProxy(conf, 
cluster.getFileSystem(0).getUri(),
+        ClientProtocol.class).getProxy();
 
-      for(int i = 0; i < 3; i++) {
-        cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null);
-      }
+    for(int i = 0; i < 3; i++) {
+      cluster.injectBlocks(i, Arrays.asList(blocksDN[i]), null);
+    }
 
-      cluster.startDataNodes(conf, 1, true, null,
-          new String[]{RACK0}, null,new long[]{CAPACITY});
-      cluster.triggerHeartbeats();
+    cluster.startDataNodes(conf, 1, true, null,
+        new String[]{RACK0}, null,new long[]{CAPACITY});
+    cluster.triggerHeartbeats();
 
-      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
-      Set<String>  datanodes = new HashSet<String>();
-      
datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
-      BalancerParameters.Builder pBuilder =
-          new BalancerParameters.Builder();
-      pBuilder.setExcludedNodes(datanodes);
-      pBuilder.setRunDuringUpgrade(false);
-      final int r = Balancer.run(namenodes, pBuilder.build(), conf);
-      assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
-    } finally {
-      cluster.shutdown();
-    }
+    Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+    Set<String>  datanodes = new HashSet<String>();
+    datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
+    BalancerParameters.Builder pBuilder =
+        new BalancerParameters.Builder();
+    pBuilder.setExcludedNodes(datanodes);
+    pBuilder.setRunDuringUpgrade(false);
+    final int r = Balancer.run(namenodes, pBuilder.build(), conf);
+    assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
   }
 
   /**
@@ -1339,47 +1338,44 @@ public class TestBalancer {
       .storageTypes(new StorageType[] { RAM_DISK, DEFAULT })
       .build();
 
-    try {
-      cluster.waitActive();
-      // Create few files on RAM_DISK
-      final String METHOD_NAME = GenericTestUtils.getMethodName();
-      final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
-      final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
-
-      DistributedFileSystem fs = cluster.getFileSystem();
-      DFSClient client = fs.getClient();
-      DFSTestUtil.createFile(fs, path1, true,
-        DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
-        DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
-      DFSTestUtil.createFile(fs, path2, true,
-        DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
-        DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
-
-      // Sleep for a short time to allow the lazy writer thread to do its job
-      Thread.sleep(6 * 1000);
-
-      // Add another fresh DN with the same type/capacity without files on 
RAM_DISK
-      StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}};
-      long[][] storageCapacities = new long[][]{{ramDiskStorageLimit, 
diskStorageLimit}};
-      cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null,
-        null, null, storageCapacities, null, false, false, false, null);
-
-      cluster.triggerHeartbeats();
-      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+    cluster.waitActive();
+    // Create few files on RAM_DISK
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+    final Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
+
+    DistributedFileSystem fs = cluster.getFileSystem();
+    DFSClient client = fs.getClient();
+    DFSTestUtil.createFile(fs, path1, true,
+      DEFAULT_RAM_DISK_BLOCK_SIZE, 4 * DEFAULT_RAM_DISK_BLOCK_SIZE,
+      DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
+    DFSTestUtil.createFile(fs, path2, true,
+      DEFAULT_RAM_DISK_BLOCK_SIZE, 1 * DEFAULT_RAM_DISK_BLOCK_SIZE,
+      DEFAULT_RAM_DISK_BLOCK_SIZE, REPL_FACT, SEED, true);
+
+    // Sleep for a short time to allow the lazy writer thread to do its job
+    Thread.sleep(6 * 1000);
+
+    // Add another fresh DN with the same type/capacity without files on 
RAM_DISK
+    StorageType[][] storageTypes = new StorageType[][] {{RAM_DISK, DEFAULT}};
+    long[][] storageCapacities = new long[][]{{ramDiskStorageLimit,
+        diskStorageLimit}};
+    cluster.startDataNodes(conf, REPL_FACT, storageTypes, true, null,
+      null, null, storageCapacities, null, false, false, false, null);
+
+    cluster.triggerHeartbeats();
+    Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
 
-      // Run Balancer
-      final BalancerParameters p = BalancerParameters.DEFAULT;
-      final int r = Balancer.run(namenodes, p, conf);
+    // Run Balancer
+    final BalancerParameters p = BalancerParameters.DEFAULT;
+    final int r = Balancer.run(namenodes, p, conf);
 
-      // Validate no RAM_DISK block should be moved
-      assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+    // Validate no RAM_DISK block should be moved
+    assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
 
-      // Verify files are still on RAM_DISK
-      DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK);
-      DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK);
-    } finally {
-      cluster.shutdown();
-    }
+    // Verify files are still on RAM_DISK
+    DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path1, RAM_DISK);
+    DFSTestUtil.verifyFileReplicasOnStorageType(fs, client, path2, RAM_DISK);
   }
 
   /**
@@ -1403,51 +1399,45 @@ public class TestBalancer {
         .storageTypes(new StorageType[] { DEFAULT })
         .storagesPerDatanode(1)
         .build();
+    cluster.waitActive();
+    // Create a file on the single DN
+    final String METHOD_NAME = GenericTestUtils.getMethodName();
+    final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
 
-    try {
-      cluster.waitActive();
-      // Create a file on the single DN
-      final String METHOD_NAME = GenericTestUtils.getMethodName();
-      final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
-
-      DistributedFileSystem fs = cluster.getFileSystem();
-      DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE,
-          (short) 1, SEED);
-
-      // Add another DN with the same capacity, cluster is now unbalanced
-      cluster.startDataNodes(conf, 1, true, null, null);
-      cluster.triggerHeartbeats();
-      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+    DistributedFileSystem fs = cluster.getFileSystem();
+    DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE,
+        (short) 1, SEED);
 
-      // Run balancer
-      final BalancerParameters p = BalancerParameters.DEFAULT;
+    // Add another DN with the same capacity, cluster is now unbalanced
+    cluster.startDataNodes(conf, 1, true, null, null);
+    cluster.triggerHeartbeats();
+    Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
 
-      fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
-      fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE);
-      fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+    // Run balancer
+    final BalancerParameters p = BalancerParameters.DEFAULT;
 
-      // Rolling upgrade should abort the balancer
-      assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(),
-          Balancer.run(namenodes, p, conf));
+    fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
+    fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE);
+    fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
 
-      // Should work with the -runDuringUpgrade flag.
-      BalancerParameters.Builder b =
-          new BalancerParameters.Builder();
-      b.setRunDuringUpgrade(true);
-      final BalancerParameters runDuringUpgrade = b.build();
-      assertEquals(ExitStatus.SUCCESS.getExitCode(),
-          Balancer.run(namenodes, runDuringUpgrade, conf));
+    // Rolling upgrade should abort the balancer
+    assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(),
+        Balancer.run(namenodes, p, conf));
 
-      // Finalize the rolling upgrade
-      fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE);
+    // Should work with the -runDuringUpgrade flag.
+    BalancerParameters.Builder b =
+        new BalancerParameters.Builder();
+    b.setRunDuringUpgrade(true);
+    final BalancerParameters runDuringUpgrade = b.build();
+    assertEquals(ExitStatus.SUCCESS.getExitCode(),
+        Balancer.run(namenodes, runDuringUpgrade, conf));
 
-      // Should also work after finalization.
-      assertEquals(ExitStatus.SUCCESS.getExitCode(),
-          Balancer.run(namenodes, p, conf));
+    // Finalize the rolling upgrade
+    fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE);
 
-    } finally {
-      cluster.shutdown();
-    }
+    // Should also work after finalization.
+    assertEquals(ExitStatus.SUCCESS.getExitCode(),
+        Balancer.run(namenodes, p, conf));
   }
 
   /**
@@ -1469,7 +1459,7 @@ public class TestBalancer {
     conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
 
     int numOfDatanodes =2;
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+    cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(2)
         .racks(new String[]{"/default/rack0", "/default/rack0"})
         .storagesPerDatanode(2)
@@ -1480,39 +1470,33 @@ public class TestBalancer {
             {100 * blockSize, 20 * blockSize},
             {20 * blockSize, 100 * blockSize}})
         .build();
+    cluster.waitActive();
 
-    try {
-      cluster.waitActive();
-
-      //set "/bar" directory with ONE_SSD storage policy.
-      DistributedFileSystem fs = cluster.getFileSystem();
-      Path barDir = new Path("/bar");
-      fs.mkdir(barDir,new FsPermission((short)777));
-      fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
-
-      // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full,
-      // and (DN0,SSD) and (DN1,DISK) are about 15% full.
-      long fileLen  = 30 * blockSize;
-      // fooFile has ONE_SSD policy. So
-      // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block.
-      // (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block.
-      Path fooFile = new Path(barDir, "foo");
-      createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0);
-      // update space info
-      cluster.triggerHeartbeats();
-
-      BalancerParameters p = BalancerParameters.DEFAULT;
-      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
-      final int r = Balancer.run(namenodes, p, conf);
-
-      // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
-      // already has one. Otherwise DN1 will have 2 replicas.
-      // For same reason, no replicas were moved.
-      assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+    //set "/bar" directory with ONE_SSD storage policy.
+    DistributedFileSystem fs = cluster.getFileSystem();
+    Path barDir = new Path("/bar");
+    fs.mkdir(barDir,new FsPermission((short)777));
+    fs.setStoragePolicy(barDir, HdfsConstants.ONESSD_STORAGE_POLICY_NAME);
+
+    // Insert 30 blocks. So (DN0,SSD) and (DN1,DISK) are about half full,
+    // and (DN0,SSD) and (DN1,DISK) are about 15% full.
+    long fileLen  = 30 * blockSize;
+    // fooFile has ONE_SSD policy. So
+    // (DN0,SSD) and (DN1,DISK) have 2 replicas belong to same block.
+    // (DN0,DISK) and (DN1,SSD) have 2 replicas belong to same block.
+    Path fooFile = new Path(barDir, "foo");
+    createFile(cluster, fooFile, fileLen, (short) numOfDatanodes, 0);
+    // update space info
+    cluster.triggerHeartbeats();
+
+    BalancerParameters p = BalancerParameters.DEFAULT;
+    Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+    final int r = Balancer.run(namenodes, p, conf);
 
-    } finally {
-      cluster.shutdown();
-    }
+    // Replica in (DN0,SSD) was not moved to (DN1,SSD), because (DN1,DISK)
+    // already has one. Otherwise DN1 will have 2 replicas.
+    // For same reason, no replicas were moved.
+    assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
   }
 
   /**
@@ -1543,50 +1527,46 @@ public class TestBalancer {
     int numOfDatanodes = capacities.length;
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(capacities.length)
         .racks(racks).simulatedCapacities(capacities).build();
-    try {
-      cluster.waitActive();
-      client = NameNodeProxies.createProxy(conf,
-          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
-
-      long totalCapacity = sum(capacities);
-
-      // fill up the cluster to be 30% full
-      final long totalUsedSpace = totalCapacity * 3 / 10;
-      createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
-          (short) numOfDatanodes, 0);
-      // start up an empty node with the same capacity and on the same rack
-      cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
-          new long[] { newCapacity });
-
-      // Case1: Simulate first balancer by creating 'balancer.id' file. It
-      // will keep this file until the balancing operation is completed.
-      FileSystem fs = cluster.getFileSystem(0);
-      final FSDataOutputStream out = fs
-          .create(Balancer.BALANCER_ID_PATH, false);
-      out.writeBytes(InetAddress.getLocalHost().getHostName());
-      out.hflush();
-      assertTrue("'balancer.id' file doesn't exist!",
-          fs.exists(Balancer.BALANCER_ID_PATH));
-
-      // start second balancer
-      final String[] args = { "-policy", "datanode" };
-      final Tool tool = new Cli();
-      tool.setConf(conf);
-      int exitCode = tool.run(args); // start balancing
-      assertEquals("Exit status code mismatches",
-          ExitStatus.IO_EXCEPTION.getExitCode(), exitCode);
-
-      // Case2: Release lease so that another balancer would be able to
-      // perform balancing.
-      out.close();
-      assertTrue("'balancer.id' file doesn't exist!",
-          fs.exists(Balancer.BALANCER_ID_PATH));
-      exitCode = tool.run(args); // start balancing
-      assertEquals("Exit status code mismatches",
-          ExitStatus.SUCCESS.getExitCode(), exitCode);
-    } finally {
-      cluster.shutdown();
-    }
+    cluster.waitActive();
+    client = NameNodeProxies.createProxy(conf,
+        cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
+
+    long totalCapacity = sum(capacities);
+
+    // fill up the cluster to be 30% full
+    final long totalUsedSpace = totalCapacity * 3 / 10;
+    createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
+        (short) numOfDatanodes, 0);
+    // start up an empty node with the same capacity and on the same rack
+    cluster.startDataNodes(conf, 1, true, null, new String[] { newRack },
+        new long[] { newCapacity });
+
+    // Case1: Simulate first balancer by creating 'balancer.id' file. It
+    // will keep this file until the balancing operation is completed.
+    FileSystem fs = cluster.getFileSystem(0);
+    final FSDataOutputStream out = fs
+        .create(Balancer.BALANCER_ID_PATH, false);
+    out.writeBytes(InetAddress.getLocalHost().getHostName());
+    out.hflush();
+    assertTrue("'balancer.id' file doesn't exist!",
+        fs.exists(Balancer.BALANCER_ID_PATH));
+
+    // start second balancer
+    final String[] args = { "-policy", "datanode" };
+    final Tool tool = new Cli();
+    tool.setConf(conf);
+    int exitCode = tool.run(args); // start balancing
+    assertEquals("Exit status code mismatches",
+        ExitStatus.IO_EXCEPTION.getExitCode(), exitCode);
+
+    // Case2: Release lease so that another balancer would be able to
+    // perform balancing.
+    out.close();
+    assertTrue("'balancer.id' file doesn't exist!",
+        fs.exists(Balancer.BALANCER_ID_PATH));
+    exitCode = tool.run(args); // start balancing
+    assertEquals("Exit status code mismatches",
+        ExitStatus.SUCCESS.getExitCode(), exitCode);
   }
 
   /** Balancer should not move blocks with size < minBlockSize. */
@@ -1606,101 +1586,97 @@ public class TestBalancer {
         .simulatedCapacities(capacities)
         .build();
     final DistributedFileSystem dfs = cluster.getFileSystem();
+    cluster.waitActive();
+    client = NameNodeProxies.createProxy(conf, dfs.getUri(),
+        ClientProtocol.class).getProxy();
 
-    try {
-      cluster.waitActive();
-      client = NameNodeProxies.createProxy(conf, dfs.getUri(),
-          ClientProtocol.class).getProxy();
-      
-      // fill up the cluster to be 80% full
-      for(int i = 0; i < lengths.length; i++) {
-        final long size = lengths[i];
-        final Path p = new Path("/file" + i + "_size" + size);
-        try(final OutputStream out = dfs.create(p)) {
-          for(int j = 0; j < size; j++) {
-            out.write(j);
-          }
+    // fill up the cluster to be 80% full
+    for(int i = 0; i < lengths.length; i++) {
+      final long size = lengths[i];
+      final Path p = new Path("/file" + i + "_size" + size);
+      try(final OutputStream out = dfs.create(p)) {
+        for(int j = 0; j < size; j++) {
+          out.write(j);
         }
       }
-      
-      // start up an empty node with the same capacity
-      cluster.startDataNodes(conf, capacities.length, true, null, null, 
capacities);
-      LOG.info("capacities    = " + Arrays.toString(capacities));
-      LOG.info("totalUsedSpace= " + totalUsed);
-      LOG.info("lengths       = " + Arrays.toString(lengths) + ", #=" + 
lengths.length);
-      waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, 
cluster);
-      
-      final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
-
-      { // run Balancer with min-block-size=50
-        BalancerParameters.Builder b =
-            new BalancerParameters.Builder();
-        b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
-        b.setThreshold(1);
-        final BalancerParameters p = b.build();
-
-        conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 
50);
-        final int r = Balancer.run(namenodes, p, conf);
-        assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
-      }
-      
-      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+    }
 
-      { // run Balancer with empty nodes as source nodes
-        final Set<String> sourceNodes = new HashSet<>();
-        final List<DataNode> datanodes = cluster.getDataNodes();
-        for(int i = capacities.length; i < datanodes.size(); i++) {
-          sourceNodes.add(datanodes.get(i).getDisplayName());
-        }
-        BalancerParameters.Builder b =
-            new BalancerParameters.Builder();
-        b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
-        b.setThreshold(1);
-        b.setSourceNodes(sourceNodes);
-        final BalancerParameters p = b.build();
-
-        conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 
50);
-        final int r = Balancer.run(namenodes, p, conf);
-        assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
-      }
+    // start up an empty node with the same capacity
+    cluster.startDataNodes(conf, capacities.length, true, null, null, 
capacities);
+    LOG.info("capacities    = " + Arrays.toString(capacities));
+    LOG.info("totalUsedSpace= " + totalUsed);
+    LOG.info("lengths       = " + Arrays.toString(lengths) + ", #=" + 
lengths.length);
+    waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, 
cluster);
+
+    final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+
+    { // run Balancer with min-block-size=50
+      BalancerParameters.Builder b =
+          new BalancerParameters.Builder();
+      b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
+      b.setThreshold(1);
+      final BalancerParameters p = b.build();
 
-      { // run Balancer with a filled node as a source node
-        final Set<String> sourceNodes = new HashSet<>();
-        final List<DataNode> datanodes = cluster.getDataNodes();
-        sourceNodes.add(datanodes.get(0).getDisplayName());
-        BalancerParameters.Builder b =
-            new BalancerParameters.Builder();
-        b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
-        b.setThreshold(1);
-        b.setSourceNodes(sourceNodes);
-        final BalancerParameters p = b.build();
-
-        conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 
1);
-        final int r = Balancer.run(namenodes, p, conf);
-        assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 
50);
+      final int r = Balancer.run(namenodes, p, conf);
+      assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
+    }
+
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+
+    { // run Balancer with empty nodes as source nodes
+      final Set<String> sourceNodes = new HashSet<>();
+      final List<DataNode> datanodes = cluster.getDataNodes();
+      for(int i = capacities.length; i < datanodes.size(); i++) {
+        sourceNodes.add(datanodes.get(i).getDisplayName());
       }
+      BalancerParameters.Builder b =
+          new BalancerParameters.Builder();
+      b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
+      b.setThreshold(1);
+      b.setSourceNodes(sourceNodes);
+      final BalancerParameters p = b.build();
 
-      { // run Balancer with all filled node as source nodes
-        final Set<String> sourceNodes = new HashSet<>();
-        final List<DataNode> datanodes = cluster.getDataNodes();
-        for(int i = 0; i < capacities.length; i++) {
-          sourceNodes.add(datanodes.get(i).getDisplayName());
-        }
-        BalancerParameters.Builder b =
-            new BalancerParameters.Builder();
-        b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
-        b.setThreshold(1);
-        b.setSourceNodes(sourceNodes);
-        final BalancerParameters p = b.build();
-
-        conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 
1);
-        final int r = Balancer.run(namenodes, p, conf);
-        assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
+      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 
50);
+      final int r = Balancer.run(namenodes, p, conf);
+      assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+    }
+
+    { // run Balancer with a filled node as a source node
+      final Set<String> sourceNodes = new HashSet<>();
+      final List<DataNode> datanodes = cluster.getDataNodes();
+      sourceNodes.add(datanodes.get(0).getDisplayName());
+      BalancerParameters.Builder b =
+          new BalancerParameters.Builder();
+      b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
+      b.setThreshold(1);
+      b.setSourceNodes(sourceNodes);
+      final BalancerParameters p = b.build();
+
+      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+      final int r = Balancer.run(namenodes, p, conf);
+      assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r);
+    }
+
+    { // run Balancer with all filled node as source nodes
+      final Set<String> sourceNodes = new HashSet<>();
+      final List<DataNode> datanodes = cluster.getDataNodes();
+      for(int i = 0; i < capacities.length; i++) {
+        sourceNodes.add(datanodes.get(i).getDisplayName());
       }
-    } finally {
-      cluster.shutdown();
+      BalancerParameters.Builder b =
+          new BalancerParameters.Builder();
+      b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
+      b.setThreshold(1);
+      b.setSourceNodes(sourceNodes);
+      final BalancerParameters p = b.build();
+
+      conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
+      final int r = Balancer.run(namenodes, p, conf);
+      assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
     }
   }
+
   public void integrationTestWithStripedFile(Configuration conf) throws 
Exception {
     initConfWithStripe(conf);
     doTestBalancerWithStripedFile(conf);
@@ -1778,4 +1754,4 @@ public class TestBalancer {
     balancerTest.testBalancer1();
     balancerTest.testBalancer2();
   }
-}
+}
\ No newline at end of file

Reply via email to