Repository: hadoop
Updated Branches:
  refs/heads/HDFS-1312 c87c8458d -> 3258c8b03


HDFS-9709. DiskBalancer : Add tests for disk balancer using a Mock Mover class. 
Contributed by Anu Engineer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3258c8b0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3258c8b0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3258c8b0

Branch: refs/heads/HDFS-1312
Commit: 3258c8b034a38a1e684164e12ef7da00263edbc0
Parents: c87c845
Author: Anu Engineer <aengin...@apache.org>
Authored: Tue Mar 22 16:26:49 2016 -0700
Committer: Anu Engineer <aengin...@apache.org>
Committed: Tue Mar 22 16:26:49 2016 -0700

----------------------------------------------------------------------
 .../hdfs/server/datanode/DiskBalancer.java      |  14 +-
 .../DiskBalancerResultVerifier.java             |  42 ++
 .../diskbalancer/TestDiskBalancerRPC.java       |  39 +-
 .../TestDiskBalancerWithMockMover.java          | 570 +++++++++++++++++++
 4 files changed, 628 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3258c8b0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index d1bc1f1..972f0fc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -40,7 +40,11 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
@@ -106,6 +110,7 @@ public class DiskBalancer {
       this.isDiskBalancerEnabled = false;
       this.currentResult = Result.NO_PLAN;
       if ((this.future != null) && (!this.future.isDone())) {
+        this.currentResult = Result.PLAN_CANCELLED;
         this.blockMover.setExitFlag();
         shutdownExecutor();
       }
@@ -120,9 +125,9 @@ public class DiskBalancer {
   private void shutdownExecutor() {
     scheduler.shutdown();
     try {
-      if(!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
+      if(!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
         scheduler.shutdownNow();
-        if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
+        if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
           LOG.error("Disk Balancer : Scheduler did not terminate.");
         }
       }
@@ -218,6 +223,7 @@ public class DiskBalancer {
       if (!this.future.isDone()) {
         this.blockMover.setExitFlag();
         shutdownExecutor();
+        this.currentResult = Result.PLAN_CANCELLED;
       }
     } finally {
       lock.unlock();
@@ -537,7 +543,7 @@ public class DiskBalancer {
   /**
    * Holds references to actual volumes that we will be operating against.
    */
-  static class VolumePair {
+  public static class VolumePair {
     private final FsVolumeSpi source;
     private final FsVolumeSpi dest;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3258c8b0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerResultVerifier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerResultVerifier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerResultVerifier.java
new file mode 100644
index 0000000..5abb33c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerResultVerifier.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+public class DiskBalancerResultVerifier
+    extends TypeSafeMatcher<DiskBalancerException> {
+  private final DiskBalancerException.Result expectedResult;
+
+  DiskBalancerResultVerifier(DiskBalancerException.Result expectedResult) {
+    this.expectedResult = expectedResult;
+  }
+
+  @Override
+  protected boolean matchesSafely(DiskBalancerException exception) {
+    return (this.expectedResult == exception.getResult());
+  }
+
+  @Override
+  public void describeTo(Description description) {
+    description.appendText("expects Result: ")
+        .appendValue(this.expectedResult);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3258c8b0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
index 9cd41c2..a65ed21 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerRPC.java
@@ -90,8 +90,7 @@ public class TestDiskBalancerRPC {
     int planVersion = rpcTestHelper.getPlanVersion();
     NodePlan plan = rpcTestHelper.getPlan();
     thrown.expect(DiskBalancerException.class);
-    thrown.expect(new
-        ResultVerifier(Result.INVALID_PLAN_HASH));
+    thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_HASH));
     dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
   }
 
@@ -104,8 +103,7 @@ public class TestDiskBalancerRPC {
     planVersion++;
     NodePlan plan = rpcTestHelper.getPlan();
     thrown.expect(DiskBalancerException.class);
-    thrown.expect(new
-        ResultVerifier(Result.INVALID_PLAN_VERSION));
+    thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN_VERSION));
     dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, plan.toJson());
   }
 
@@ -117,8 +115,7 @@ public class TestDiskBalancerRPC {
     int planVersion = rpcTestHelper.getPlanVersion();
     NodePlan plan = rpcTestHelper.getPlan();
     thrown.expect(DiskBalancerException.class);
-    thrown.expect(new
-        ResultVerifier(Result.INVALID_PLAN));
+    thrown.expect(new DiskBalancerResultVerifier(Result.INVALID_PLAN));
     dataNode.submitDiskBalancerPlan(planHash, planVersion, 10, "");
   }
 
@@ -143,8 +140,7 @@ public class TestDiskBalancerRPC {
     planHash = String.valueOf(hashArray);
     NodePlan plan = rpcTestHelper.getPlan();
     thrown.expect(DiskBalancerException.class);
-    thrown.expect(new
-        ResultVerifier(Result.NO_SUCH_PLAN));
+    thrown.expect(new DiskBalancerResultVerifier(Result.NO_SUCH_PLAN));
     dataNode.cancelDiskBalancePlan(planHash);
   }
 
@@ -155,8 +151,7 @@ public class TestDiskBalancerRPC {
     String planHash = "";
     NodePlan plan = rpcTestHelper.getPlan();
     thrown.expect(DiskBalancerException.class);
-    thrown.expect(new
-        ResultVerifier(Result.NO_SUCH_PLAN));
+    thrown.expect(new DiskBalancerResultVerifier(Result.NO_SUCH_PLAN));
     dataNode.cancelDiskBalancePlan(planHash);
   }
 
@@ -182,8 +177,7 @@ public class TestDiskBalancerRPC {
     final String invalidSetting = "invalidSetting";
     DataNode dataNode = cluster.getDataNodes().get(dnIndex);
     thrown.expect(DiskBalancerException.class);
-    thrown.expect(new
-        ResultVerifier(Result.UNKNOWN_KEY));
+    thrown.expect(new DiskBalancerResultVerifier(Result.UNKNOWN_KEY));
     dataNode.getDiskBalancerSetting(invalidSetting);
   }
 
@@ -274,25 +268,4 @@ public class TestDiskBalancerRPC {
       return this;
     }
   }
-
-  private class ResultVerifier
-      extends TypeSafeMatcher<DiskBalancerException> {
-    private final DiskBalancerException.Result expectedResult;
-
-    ResultVerifier(DiskBalancerException.Result expectedResult){
-      this.expectedResult = expectedResult;
-    }
-
-    @Override
-    protected boolean matchesSafely(DiskBalancerException exception) {
-      return (this.expectedResult == exception.getResult());
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("expects Result: ")
-          .appendValue(this.expectedResult);
-
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3258c8b0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
new file mode 100644
index 0000000..ed761ed
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.server.diskbalancer;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancer;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkItem;
+import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
+import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
+import 
org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
+import 
org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.GreedyPlanner;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.MoveStep;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
+import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
+import org.apache.hadoop.util.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.NO_PLAN;
+import static 
org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result.PLAN_DONE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestDiskBalancerWithMockMover {
+  static final Log LOG = 
LogFactory.getLog(TestDiskBalancerWithMockMover.class);
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  MiniDFSCluster cluster;
+  String sourceName;
+  String destName;
+  String sourceUUID;
+  String destUUID;
+  String nodeID;
+  DataNode dataNode;
+
+  /**
+   * Checks that we return the right error if diskbalancer is not enabled.
+   */
+  @Test
+  public void testDiskBalancerDisabled() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, false);
+    restartDataNode();
+
+    TestMover blockMover = new TestMover(cluster.getDataNodes()
+        .get(0).getFSDataset());
+
+    DiskBalancer balancer = new DiskBalancerBuilder(conf)
+        .setMover(blockMover)
+        .build();
+
+    thrown.expect(DiskBalancerException.class);
+    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+        .Result.DISK_BALANCER_NOT_ENABLED));
+
+    balancer.queryWorkStatus();
+  }
+
+  /**
+   * Checks that Enable flag works correctly.
+   *
+   * @throws DiskBalancerException
+   */
+  @Test
+  public void testDiskBalancerEnabled() throws DiskBalancerException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+
+    TestMover blockMover = new TestMover(cluster.getDataNodes()
+        .get(0).getFSDataset());
+
+    DiskBalancer balancer = new DiskBalancerBuilder(conf)
+        .setMover(blockMover)
+        .build();
+
+    DiskBalancerWorkStatus status = balancer.queryWorkStatus();
+    assertEquals(NO_PLAN, status.getResult());
+
+  }
+
+  private void executeSubmitPlan(NodePlan plan, DiskBalancer balancer,
+                                 int version) throws IOException {
+    String planJson = plan.toJson();
+    String planID = DigestUtils.sha512Hex(planJson);
+    balancer.submitPlan(planID, version, planJson, 10, false);
+  }
+
+  private void executeSubmitPlan(NodePlan plan, DiskBalancer balancer)
+      throws IOException {
+    executeSubmitPlan(plan, balancer, 1);
+  }
+
+  /**
+   * Test a second submit plan fails.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testResubmitDiskBalancerPlan() throws Exception {
+    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+    NodePlan plan = mockMoverHelper.getPlan();
+    DiskBalancer balancer = mockMoverHelper.getBalancer();
+
+    // ask block mover to get stuck in copy block
+    mockMoverHelper.getBlockMover().setSleep();
+    executeSubmitPlan(plan, balancer);
+    thrown.expect(DiskBalancerException.class);
+    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+        .Result.PLAN_ALREADY_IN_PROGRESS));
+    executeSubmitPlan(plan, balancer);
+
+    // Not needed but this is the cleanup step.
+    mockMoverHelper.getBlockMover().clearSleep();
+  }
+
+  @Test
+  public void testSubmitDiskBalancerPlan() throws Exception {
+    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+    NodePlan plan = mockMoverHelper.getPlan();
+    DiskBalancer balancer = mockMoverHelper.getBalancer();
+
+    executeSubmitPlan(plan, balancer);
+    int counter = 0;
+    while ((balancer.queryWorkStatus().getResult() != PLAN_DONE) &&
+        (counter < 3)) {
+      Thread.sleep(1000);
+      counter++;
+    }
+
+    // Asserts that submit plan caused an execution in the background.
+    assertTrue(mockMoverHelper.getBlockMover().getRunCount() == 1);
+  }
+
+  @Test
+  public void testSubmitWithOlderPlan() throws Exception {
+    final long MILLISECOND_IN_AN_HOUR = 1000 * 60 * 60L;
+    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+    NodePlan plan = mockMoverHelper.getPlan();
+    DiskBalancer balancer = mockMoverHelper.getBalancer();
+
+    plan.setTimeStamp(Time.now() - (32 * MILLISECOND_IN_AN_HOUR));
+    thrown.expect(DiskBalancerException.class);
+    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+        .Result.OLD_PLAN_SUBMITTED));
+    executeSubmitPlan(plan, balancer);
+  }
+
+  @Test
+  public void testSubmitWithOldInvalidVersion() throws Exception {
+    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+    NodePlan plan = mockMoverHelper.getPlan();
+    DiskBalancer balancer = mockMoverHelper.getBalancer();
+
+    thrown.expect(DiskBalancerException.class);
+    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+        .Result.INVALID_PLAN_VERSION));
+
+    // Plan version is invalid -- there is no version 0.
+    executeSubmitPlan(plan, balancer, 0);
+  }
+
+  @Test
+  public void testSubmitWithNullPlan() throws Exception {
+    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+    NodePlan plan = mockMoverHelper.getPlan();
+    DiskBalancer balancer = mockMoverHelper.getBalancer();
+    String planJson = plan.toJson();
+    String planID = DigestUtils.sha512Hex(planJson);
+
+    thrown.expect(DiskBalancerException.class);
+    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+        .Result.INVALID_PLAN));
+
+    balancer.submitPlan(planID, 1, null, 10, false);
+  }
+
+  @Test
+  public void testSubmitWithInvalidHash() throws Exception {
+    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+    NodePlan plan = mockMoverHelper.getPlan();
+    DiskBalancer balancer = mockMoverHelper.getBalancer();
+
+
+    String planJson = plan.toJson();
+    String planID = DigestUtils.sha512Hex(planJson);
+    char repChar = planID.charAt(0);
+    repChar++;
+
+    thrown.expect(DiskBalancerException.class);
+    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+        .Result.INVALID_PLAN_HASH));
+    balancer.submitPlan(planID.replace(planID.charAt(0), repChar),
+        1, planJson, 10, false);
+
+  }
+
+  /**
+   * Test Cancel Plan.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testCancelDiskBalancerPlan() throws Exception {
+    MockMoverHelper mockMoverHelper = new MockMoverHelper().invoke();
+    NodePlan plan = mockMoverHelper.getPlan();
+    DiskBalancer balancer = mockMoverHelper.getBalancer();
+
+
+    // ask block mover to delay execution
+    mockMoverHelper.getBlockMover().setSleep();
+    executeSubmitPlan(plan, balancer);
+
+
+    String planJson = plan.toJson();
+    String planID = DigestUtils.sha512Hex(planJson);
+    balancer.cancelPlan(planID);
+
+    DiskBalancerWorkStatus status = balancer.queryWorkStatus();
+    assertEquals(DiskBalancerWorkStatus.Result.PLAN_CANCELLED,
+        status.getResult());
+
+
+    executeSubmitPlan(plan, balancer);
+
+    // Send a Wrong cancellation request.
+    char first = planID.charAt(0);
+    first++;
+    thrown.expect(DiskBalancerException.class);
+    thrown.expect(new DiskBalancerResultVerifier(DiskBalancerException
+        .Result.NO_SUCH_PLAN));
+    balancer.cancelPlan(planID.replace(planID.charAt(0), first));
+
+    // Now cancel the real one
+    balancer.cancelPlan(planID);
+    mockMoverHelper.getBlockMover().clearSleep(); // unblock mover.
+
+    status = balancer.queryWorkStatus();
+    assertEquals(DiskBalancerWorkStatus.Result.PLAN_CANCELLED,
+        status.getResult());
+
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    final int NUM_STORAGES_PER_DN = 2;
+    cluster = new MiniDFSCluster
+        .Builder(conf).numDataNodes(3)
+        .storagesPerDatanode(NUM_STORAGES_PER_DN)
+        .build();
+    cluster.waitActive();
+    dataNode = cluster.getDataNodes().get(0);
+    FsDatasetSpi.FsVolumeReferences references = dataNode.getFSDataset()
+        .getFsVolumeReferences();
+
+    nodeID = dataNode.getDatanodeUuid();
+    sourceName = references.get(0).getBasePath();
+    destName = references.get(1).getBasePath();
+    sourceUUID = references.get(0).getStorageID();
+    destUUID = references.get(1).getStorageID();
+    references.close();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private void restartDataNode() throws IOException {
+    if (cluster != null) {
+      cluster.restartDataNode(0);
+    }
+  }
+
+  /**
+   * Allows us to control mover class for test purposes.
+   */
+  public static class TestMover implements DiskBalancer.BlockMover {
+
+    private AtomicBoolean shouldRun;
+    private FsDatasetSpi dataset;
+    private Integer runCount;
+    private volatile boolean sleepInCopyBlocks;
+    private long delay;
+
+    public TestMover(FsDatasetSpi dataset) {
+      this.dataset = dataset;
+      this.shouldRun = new AtomicBoolean(false);
+      this.runCount = new Integer(0);
+    }
+
+    public void setSleep() {
+      sleepInCopyBlocks = true;
+    }
+
+    public void clearSleep() {
+      sleepInCopyBlocks = false;
+    }
+
+    public void setDelay(long milliseconds) {
+      this.delay = milliseconds;
+    }
+
+    /**
+     * Copies blocks from a set of volumes.
+     *
+     * @param pair - Source and Destination Volumes.
+     * @param item - Number of bytes to move from volumes.
+     */
+    @Override
+    public void copyBlocks(DiskBalancer.VolumePair pair,
+                           DiskBalancerWorkItem item) {
+
+      try {
+        // get stuck if we are asked to sleep.
+        while (sleepInCopyBlocks) {
+          if (!this.shouldRun()) {
+            return;
+          }
+          Thread.sleep(10);
+        }
+        if (delay > 0) {
+          Thread.sleep(delay);
+        }
+        synchronized (runCount) {
+          if (shouldRun()) {
+            runCount++;
+          }
+        }
+      } catch (InterruptedException ex) {
+        // A failure here can be safely ignored with no impact for tests.
+        LOG.error(ex.toString());
+      }
+    }
+
+    /**
+     * Sets copyblocks into runnable state.
+     */
+    @Override
+    public void setRunnable() {
+      this.shouldRun.set(true);
+    }
+
+    /**
+     * Signals copy block to exit.
+     */
+    @Override
+    public void setExitFlag() {
+      this.shouldRun.set(false);
+    }
+
+    /**
+     * Returns the shouldRun boolean flag.
+     */
+    public boolean shouldRun() {
+      return this.shouldRun.get();
+    }
+
+    @Override
+    public FsDatasetSpi getDataset() {
+      return this.dataset;
+    }
+
+    public int getRunCount() {
+      synchronized (runCount) {
+        LOG.info("Run count : " + runCount.intValue());
+        return runCount.intValue();
+      }
+    }
+  }
+
+  private class MockMoverHelper {
+    private DiskBalancer balancer;
+    private NodePlan plan;
+    private TestMover blockMover;
+
+    public DiskBalancer getBalancer() {
+      return balancer;
+    }
+
+    public NodePlan getPlan() {
+      return plan;
+    }
+
+    public TestMover getBlockMover() {
+      return blockMover;
+    }
+
+    public MockMoverHelper invoke() throws Exception {
+      Configuration conf = new HdfsConfiguration();
+      conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true);
+      restartDataNode();
+
+      blockMover = new TestMover(dataNode.getFSDataset());
+      blockMover.setRunnable();
+
+      balancer = new DiskBalancerBuilder(conf)
+          .setMover(blockMover)
+          .setNodeID(nodeID)
+          .build();
+
+      DiskBalancerCluster diskBalancerCluster = new 
DiskBalancerClusterBuilder()
+          .setClusterSource("/diskBalancer/data-cluster-3node-3disk.json")
+          .build();
+
+      plan = new PlanBuilder(diskBalancerCluster, nodeID)
+          .setPathMap(sourceName, destName)
+          .setUUIDMap(sourceUUID, destUUID)
+          .build();
+      return this;
+    }
+  }
+
+  private class DiskBalancerBuilder {
+    private TestMover blockMover;
+    private Configuration conf;
+    private String nodeID;
+
+    public DiskBalancerBuilder(Configuration conf) {
+      this.conf = conf;
+    }
+
+    public DiskBalancerBuilder setNodeID(String nodeID) {
+      this.nodeID = nodeID;
+      return this;
+    }
+
+    public DiskBalancerBuilder setConf(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public DiskBalancerBuilder setMover(TestMover mover) {
+      this.blockMover = mover;
+      return this;
+    }
+
+    public DiskBalancerBuilder setRunnable() {
+      blockMover.setRunnable();
+      return this;
+    }
+
+    public DiskBalancer build() {
+      Preconditions.checkNotNull(blockMover);
+      return new DiskBalancer(nodeID, conf,
+          blockMover);
+    }
+  }
+
+  private class DiskBalancerClusterBuilder {
+    private String jsonFilePath;
+    private Configuration conf;
+
+    public DiskBalancerClusterBuilder setConf(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public DiskBalancerClusterBuilder setClusterSource(String jsonFilePath)
+        throws Exception {
+      this.jsonFilePath = jsonFilePath;
+      return this;
+    }
+
+    public DiskBalancerCluster build() throws Exception {
+      DiskBalancerCluster diskBalancerCluster;
+      URI clusterJson = getClass().getResource(jsonFilePath).toURI();
+      ClusterConnector jsonConnector =
+          ConnectorFactory.getCluster(clusterJson, conf);
+      diskBalancerCluster = new DiskBalancerCluster(jsonConnector);
+      diskBalancerCluster.readClusterInfo();
+      diskBalancerCluster.setNodesToProcess(diskBalancerCluster.getNodes());
+      return diskBalancerCluster;
+    }
+  }
+
+  private class PlanBuilder {
+    private String sourcePath;
+    private String destPath;
+    private String sourceUUID;
+    private String destUUID;
+    private DiskBalancerCluster balancerCluster;
+    private String nodeID;
+
+    public PlanBuilder(DiskBalancerCluster balancerCluster, String nodeID) {
+      this.balancerCluster = balancerCluster;
+      this.nodeID = nodeID;
+    }
+
+    public PlanBuilder setPathMap(String sourcePath, String destPath) {
+      this.sourcePath = sourcePath;
+      this.destPath = destPath;
+      return this;
+    }
+
+    public PlanBuilder setUUIDMap(String sourceUUID, String destUUID) {
+      this.sourceUUID = sourceUUID;
+      this.destUUID = destUUID;
+      return this;
+    }
+
+    public NodePlan build() throws Exception {
+      final int dnIndex = 0;
+      Preconditions.checkNotNull(balancerCluster);
+      Preconditions.checkState(nodeID.length() > 0);
+
+      DiskBalancerDataNode node = balancerCluster.getNodes().get(dnIndex);
+      node.setDataNodeUUID(nodeID);
+      GreedyPlanner planner = new GreedyPlanner(10.0f, node);
+      NodePlan plan = new NodePlan(node.getDataNodeName(), node.getDataNodePort
+          ());
+      planner.balanceVolumeSet(node, node.getVolumeSets().get("DISK"), plan);
+      setVolumeNames(plan);
+      return plan;
+    }
+
+    private void setVolumeNames(NodePlan plan) {
+      Iterator<Step> iter = plan.getVolumeSetPlans().iterator();
+      while (iter.hasNext()) {
+        MoveStep nextStep = (MoveStep) iter.next();
+        nextStep.getSourceVolume().setPath(sourcePath);
+        nextStep.getSourceVolume().setUuid(sourceUUID);
+        nextStep.getDestinationVolume().setPath(destPath);
+        nextStep.getDestinationVolume().setUuid(destUUID);
+      }
+    }
+
+  }
+}
+

Reply via email to