Repository: hadoop
Updated Branches:
  refs/heads/trunk 83798f15f -> 850b2f256


HDFS-1686. Federation: Add more Balancer tests with federation setting.  
Contributed by Bharat Viswanadham


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/850b2f25
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/850b2f25
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/850b2f25

Branch: refs/heads/trunk
Commit: 850b2f256722a360d1378b505832cd99c4c5a686
Parents: 83798f1
Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Authored: Fri Mar 2 10:42:39 2018 -0800
Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Committed: Fri Mar 2 10:42:39 2018 -0800

----------------------------------------------------------------------
 .../TestBalancerWithMultipleNameNodes.java      | 165 +++++++++++++++----
 1 file changed, 133 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/850b2f25/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
index f01d79e..cf4c86f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
@@ -100,6 +100,19 @@ public class TestBalancerWithMultipleNameNodes {
       replication = 1;
       this.parameters = parameters;
     }
+
+    Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes,
+          BalancerParameters parameters, Configuration conf, short
+              replicationFactor) throws IOException {
+      this.conf = conf;
+      this.cluster = cluster;
+      clients = new ClientProtocol[nNameNodes];
+      for(int i = 0; i < nNameNodes; i++) {
+        clients[i] = cluster.getNameNode(i).getRpcServer();
+      }
+      replication = replicationFactor;
+      this.parameters = parameters;
+    }
   }
 
   /* create a file with a length of <code>fileLen</code> */
@@ -154,7 +167,7 @@ public class TestBalancerWithMultipleNameNodes {
 
   static void runBalancer(Suite s,
       final long totalUsed, final long totalCapacity) throws Exception {
-    final double avg = totalUsed*100.0/totalCapacity;
+    double avg = totalUsed*100.0/totalCapacity;
 
     LOG.info("BALANCER 0: totalUsed=" + totalUsed
         + ", totalCapacity=" + totalCapacity
@@ -180,6 +193,9 @@ public class TestBalancerWithMultipleNameNodes {
     for(boolean balanced = false; !balanced; i++) {
       final long[] used = new long[s.cluster.getDataNodes().size()];
       final long[] cap = new long[used.length];
+      final long[][] bpUsed = new long[s.clients.length][s.cluster
+          .getDataNodes().size()];
+
 
       for(int n = 0; n < s.clients.length; n++) {
         final DatanodeInfo[] datanodes = s.clients[n].getDatanodeReport(
@@ -199,25 +215,52 @@ public class TestBalancerWithMultipleNameNodes {
             Assert.assertEquals(used[d], datanodes[d].getDfsUsed());
             Assert.assertEquals(cap[d], datanodes[d].getCapacity());
           }
+          bpUsed[n][d] = datanodes[d].getBlockPoolUsed();
         }
       }
 
+
+
       balanced = true;
       for(int d = 0; d < used.length; d++) {
-        final double p = used[d]*100.0/cap[d];
-        balanced = p <= avg + s.parameters.getThreshold();
-        if (!balanced) {
-          if (i % 100 == 0) {
-            LOG.warn("datanodes " + d + " is not yet balanced: "
-                + "used=" + used[d] + ", cap=" + cap[d] + ", avg=" + avg);
-            LOG.warn("TestBalancer.sum(used)=" + TestBalancer.sum(used)
-                + ", TestBalancer.sum(cap)=" + TestBalancer.sum(cap));
+        double p;
+        if(s.parameters.getBalancingPolicy() == BalancingPolicy.Pool.INSTANCE) 
{
+          for (int k = 0; k < s.parameters.getBlockPools().size(); k++) {
+            avg = TestBalancer.sum(bpUsed[k])*100/totalCapacity;
+            p = bpUsed[k][d] * 100.0 / cap[d];
+            balanced = p <= avg + s.parameters.getThreshold();
+            if (!balanced) {
+              if (i % 100 == 0) {
+                LOG.warn("datanodes " + d + " is not yet balanced: "
+                    + "block pool used=" + bpUsed[d][k] + ", cap=" + cap[d] +
+                    ", avg=" + avg);
+                LOG.warn("sum(blockpoolUsed)=" + TestBalancer.sum(bpUsed[k])
+                    + ", sum(cap)=" + TestBalancer.sum(cap));
+              }
+              sleep(100);
+              break;
+            }
+          }
+          if (!balanced) {
+            break;
+          }
+        } else {
+          p = used[d] * 100.0 / cap[d];
+          balanced = p <= avg + s.parameters.getThreshold();
+          if (!balanced) {
+            if (i % 100 == 0) {
+              LOG.warn("datanodes " + d + " is not yet balanced: "
+                  + "used=" + used[d] + ", cap=" + cap[d] + ", avg=" + avg);
+              LOG.warn("sum(used)=" + TestBalancer.sum(used)
+                  + ", sum(cap)=" + TestBalancer.sum(cap));
+            }
+            sleep(100);
+            break;
           }
-          sleep(100);
-          break;
         }
       }
     }
+
     LOG.info("BALANCER 6");
     // cluster is balanced, verify that only selected blockpools were touched
     Map<Integer, DatanodeStorageReport[]> postBalancerPoolUsages =
@@ -425,19 +468,26 @@ public class TestBalancerWithMultipleNameNodes {
    * It then adds an empty node and start balancing.
    *
    * @param nNameNodes Number of NameNodes
-   * @param capacities Capacities of the datanodes
    * @param racks Rack names
-   * @param newCapacity the capacity of the new DataNode
    * @param newRack the rack for the new DataNode
    * @param conf Configuration
+   * @param nNameNodestoBalance noOfNameNodestoBalance
+   * @param balancerParameters BalancerParameters
    */ 
-  private void runTest(final int nNameNodes, long[] capacities, String[] racks,
-      long newCapacity, String newRack, Configuration conf) throws Exception {
-    final int nDataNodes = capacities.length;
+  private void runTest(final int nNameNodes, String[] racks,
+                       String[] newRack, Configuration conf,
+                       int nNameNodestoBalance,
+                       BalancerParameters balancerParameters)
+      throws Exception {
+    final int nDataNodes = racks.length;
+    final long[] capacities = new long[nDataNodes];
+    Arrays.fill(capacities, CAPACITY);
     LOG.info("nNameNodes=" + nNameNodes + ", nDataNodes=" + nDataNodes);
     Assert.assertEquals(nDataNodes, racks.length);
 
-    LOG.info("RUN_TEST -1");
+    LOG.info("RUN_TEST -1: start a cluster with nNameNodes=" + nNameNodes
+        + ", nDataNodes=" + nDataNodes);
+
     final MiniDFSCluster cluster = new MiniDFSCluster
         .Builder(new Configuration(conf))
         .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes))
@@ -451,44 +501,63 @@ public class TestBalancerWithMultipleNameNodes {
     try {
       cluster.waitActive();
       LOG.info("RUN_TEST 1");
-      final Suite s =
-          new Suite(cluster, nNameNodes, nDataNodes,
-              BalancerParameters.DEFAULT, conf);
+
+      Suite s;
+
+      Set<String> blockpools = new HashSet<>();
+      if(balancerParameters == null) {
+        s = new Suite(cluster, nNameNodes, nDataNodes,
+            BalancerParameters.DEFAULT, conf);
+
+      } else {
+        for (int i=0; i< nNameNodestoBalance; i++) {
+          blockpools.add(cluster.getNamesystem(i).getBlockPoolId());
+        }
+        BalancerParameters.Builder b =
+            new BalancerParameters.Builder();
+        b.setBalancingPolicy(balancerParameters.getBalancingPolicy());
+        b.setBlockpools(blockpools);
+        BalancerParameters params = b.build();
+        s = new Suite(cluster, nNameNodes, nDataNodes, params, conf, (short)2);
+      }
       long totalCapacity = TestBalancer.sum(capacities);
 
-      LOG.info("RUN_TEST 2");
+      LOG.info("RUN_TEST 2: create files");
       // fill up the cluster to be 30% full
-      final long totalUsed = totalCapacity*3/10;
+      final long totalUsed = (totalCapacity * s.replication)*3/10;
       final long size = (totalUsed/nNameNodes)/s.replication;
       for(int n = 0; n < nNameNodes; n++) {
         createFile(s, n, size);
       }
 
-      LOG.info("RUN_TEST 3");
+      LOG.info("RUN_TEST 3: " + newRack.length + " new datanodes");
       // start up an empty node with the same capacity and on the same rack
-      cluster.startDataNodes(conf, 1, true, null,
-          new String[]{newRack}, new long[]{newCapacity});
+      final long[] newCapacity = new long[newRack.length];
+      Arrays.fill(newCapacity, CAPACITY);
+      cluster.startDataNodes(conf, newCapacity.length, true, null,
+          newRack, newCapacity);
 
-      totalCapacity += newCapacity;
+      totalCapacity += TestBalancer.sum(newCapacity);
 
-      LOG.info("RUN_TEST 4");
+      LOG.info("RUN_TEST 4: run Balancer");
       // run RUN_TEST and validate results
       runBalancer(s, totalUsed, totalCapacity);
       LOG.info("RUN_TEST 5");
     } finally {
       cluster.shutdown();
     }
-    LOG.info("RUN_TEST 6");
+    LOG.info("RUN_TEST 6: done");
   }
+
   
   /** Test a cluster with even distribution, 
-   * then a new empty node is added to the cluster
+   * then a new empty node is added to the cluster.
    */
   @Test
-  public void testBalancer() throws Exception {
+  public void testTwoOneOne() throws Exception {
     final Configuration conf = createConf();
-    runTest(2, new long[]{CAPACITY}, new String[]{RACK0},
-        CAPACITY/2, RACK0, conf);
+    runTest(2, new String[]{RACK0}, new String[] {RACK0}, conf,
+        2, null);
   }
 
   /** Test unevenly distributed cluster */
@@ -517,4 +586,36 @@ public class TestBalancerWithMultipleNameNodes {
         5 * CAPACITY / 100, 10 * CAPACITY / 100 }, new long[] { CAPACITY,
         CAPACITY, CAPACITY }, new String[] { RACK0, RACK1, RACK2 }, conf);
   }
+
+  /** Even distribution with 2 Namenodes, 4 Datanodes and 2 new Datanodes. */
+  @Test(timeout = 600000)
+  public void testTwoFourTwo() throws Exception {
+    final Configuration conf = createConf();
+    runTest(2, new String[]{RACK0, RACK0, RACK1, RACK1},
+        new String[]{RACK2, RACK2}, conf, 2, null);
+  }
+
+  @Test(timeout=600000)
+  public void testBalancingBlockpoolsWithBlockPoolPolicy() throws Exception {
+    final Configuration conf = createConf();
+    BalancerParameters balancerParameters = new BalancerParameters.Builder()
+        .setBalancingPolicy(BalancingPolicy.Pool.INSTANCE).build();
+    runTest(2, new String[]{RACK0, RACK0, RACK1, RACK1},
+        new String[]{RACK2, RACK2}, conf, 2,
+        balancerParameters);
+  }
+
+  @Test(timeout = 600000)
+  public void test1OutOf2BlockpoolsWithBlockPoolPolicy()
+      throws
+      Exception {
+    final Configuration conf = createConf();
+    BalancerParameters balancerParameters = new BalancerParameters.Builder()
+        .setBalancingPolicy(BalancingPolicy.Pool.INSTANCE).build();
+    runTest(2, new String[]{RACK0, RACK0, RACK1, RACK1},
+        new String[]{RACK2, RACK2}, conf, 1,
+        balancerParameters);
+  }
+
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to