HDFS-7034. Archival Storage: Fix TestBlockPlacement and TestStorageMover. 
Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0d85f7e5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0d85f7e5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0d85f7e5

Branch: refs/heads/HDFS-6581
Commit: 0d85f7e59146cc3e9a040c2203995f3efd8ed4eb
Parents: 70dfe9c
Author: Jing Zhao <ji...@apache.org>
Authored: Thu Sep 11 13:00:43 2014 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Thu Sep 11 13:00:43 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   2 +-
 .../hadoop/hdfs/server/balancer/Dispatcher.java |  27 +-
 .../hdfs/server/balancer/NameNodeConnector.java |  20 +-
 .../server/blockmanagement/BlockManager.java    |   4 +-
 .../hadoop/hdfs/server/datanode/DataNode.java   |   4 +-
 .../apache/hadoop/hdfs/server/mover/Mover.java  |  10 +-
 .../hadoop/hdfs/TestBlockStoragePolicy.java     |   8 +-
 .../hdfs/server/mover/TestStorageMover.java     | 286 ++++++++++---------
 8 files changed, 208 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f3edbf5..641fb52 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -372,7 +372,7 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_BALANCER_MOVERTHREADS_DEFAULT = 1000;
   public static final String  DFS_BALANCER_DISPATCHERTHREADS_KEY = 
"dfs.balancer.dispatcherThreads";
   public static final int     DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200;
-  
+
   public static final String  DFS_MOVER_MOVEDWINWIDTH_KEY = 
"dfs.mover.movedWinWidth";
   public static final long    DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
   public static final String  DFS_MOVER_MOVERTHREADS_KEY = 
"dfs.mover.moverThreads";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 98bd58e..f2a1299 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -49,6 +49,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.StorageType;
@@ -88,7 +89,11 @@ public class Dispatcher {
   private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2 * GB;
 
   private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
-  private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds
+  /**
+   * the period of time to delay the usage of a DataNode after hitting
+   * errors when using it for migrating data
+   */
+  private static long delayAfterErrors = 10 * 1000;
 
   private final NameNodeConnector nnc;
   private final SaslDataTransferClient saslClient;
@@ -112,6 +117,7 @@ public class Dispatcher {
 
   private final ExecutorService moveExecutor;
   private final ExecutorService dispatchExecutor;
+
   /** The maximum number of concurrent blocks moves at a datanode */
   private final int maxConcurrentMovesPerNode;
 
@@ -187,10 +193,12 @@ public class Dispatcher {
 
     @Override
     public String toString() {
-      final Block b = block.getBlock();
-      return b + " with size=" + b.getNumBytes() + " from "
-          + source.getDisplayName() + " to " + target.getDisplayName()
-          + " through " + proxySource.datanode;
+      final Block b = block != null ? block.getBlock() : null;
+      String bStr = b != null ? (b + " with size=" + b.getNumBytes() + " ")
+          : " ";
+      return bStr + "from " + source.getDisplayName() + " to " + target
+          .getDisplayName() + " through " + (proxySource != null ? proxySource
+          .datanode : "");
     }
 
     /**
@@ -316,8 +324,8 @@ public class Dispatcher {
         // further in order to avoid a potential storm of "threads quota
         // exceeded" warnings when the dispatcher gets out of sync with work
         // going on in datanodes.
-        proxySource.activateDelay(DELAY_AFTER_ERROR);
-        target.getDDatanode().activateDelay(DELAY_AFTER_ERROR);
+        proxySource.activateDelay(delayAfterErrors);
+        target.getDDatanode().activateDelay(delayAfterErrors);
       } finally {
         IOUtils.closeStream(out);
         IOUtils.closeStream(in);
@@ -1043,6 +1051,11 @@ public class Dispatcher {
     blockMoveWaitTime = time;
   }
 
+  @VisibleForTesting
+  public static void setDelayAfterErrors(long time) {
+    delayAfterErrors = time;
+  }
+
   /** shutdown thread pools */
   public void shutdownNow() {
     if (dispatchExecutor != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 79815c0..9e08d51 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -25,13 +25,16 @@ import java.net.InetAddress;
 import java.net.URI;
 import java.util.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
@@ -53,6 +56,7 @@ public class NameNodeConnector implements Closeable {
   private static final Log LOG = LogFactory.getLog(NameNodeConnector.class);
 
   private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
+  private static boolean createIdFile = true;
   
   /** Create {@link NameNodeConnector} for the given namenodes. */
   public static List<NameNodeConnector> newNameNodeConnectors(
@@ -83,6 +87,11 @@ public class NameNodeConnector implements Closeable {
     return connectors;
   }
 
+  @VisibleForTesting
+  public static void setCreateIdFile(boolean create) {
+    createIdFile = create;
+  }
+
   private final URI nameNodeUri;
   private final String blockpoolID;
 
@@ -117,9 +126,10 @@ public class NameNodeConnector implements Closeable {
     final FsServerDefaults defaults = fs.getServerDefaults(new Path("/"));
     this.keyManager = new KeyManager(blockpoolID, namenode,
         defaults.getEncryptDataTransfer(), conf);
-    // Exit if there is another one running.
-    out = checkAndMarkRunning(); 
-    if (out == null) {
+    // if it is for test, we do not create the id file
+    out = createIdFile ? checkAndMarkRunning() : null;
+    if (createIdFile && out == null) {
+      // Exit if there is another one running.
       throw new IOException("Another " + name + " is running.");
     }
   }
@@ -188,9 +198,9 @@ public class NameNodeConnector implements Closeable {
    */
   private OutputStream checkAndMarkRunning() throws IOException {
     try {
-      final DataOutputStream out = fs.create(idPath);
+      final FSDataOutputStream out = fs.create(idPath);
       out.writeBytes(InetAddress.getLocalHost().getHostName());
-      out.flush();
+      out.hflush();
       return out;
     } catch(RemoteException e) {
       
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 956900d..cb303a7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1502,7 +1502,7 @@ public class BlockManager {
    * @throws IOException
    *           if the number of targets < minimum replication.
    * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
-   *      List, boolean, Set, long, StorageType)
+   *      Set, long, List, BlockStoragePolicy)
    */
   public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
@@ -2811,7 +2811,7 @@ public class BlockManager {
       return false; // only consider delHint for the first case
     } else if (delHint == null) {
       return false; // no delHint
-    } else if (!excessTypes.remove(delHint.getStorageType())) {
+    } else if (!excessTypes.contains(delHint.getStorageType())) {
       return false; // delHint storage type is not an excess type
     } else {
       // check if removing delHint reduces the number of racks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 3810621..9b57262 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1744,7 +1744,9 @@ public class DataNode extends Configured
             + b + " (numBytes=" + b.getNumBytes() + ")"
             + ", stage=" + stage
             + ", clientname=" + clientname
-            + ", targets=" + Arrays.asList(targets));
+            + ", targets=" + Arrays.asList(targets)
+            + ", target storage types=" + (targetStorageTypes == null ? "[]" :
+            Arrays.asList(targetStorageTypes)));
       }
       this.targets = targets;
       this.targetStorageTypes = targetStorageTypes;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index c5d6dab..96588ff 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -328,8 +328,6 @@ public class Mover {
           if (scheduleMoves4Block(diff, lb)) {
             hasRemaining |= (diff.existing.size() > 1 &&
                 diff.expected.size() > 1);
-          } else {
-            hasRemaining = false; // not able to schedule any move
           }
         }
       }
@@ -453,9 +451,11 @@ public class Mover {
 
   static int run(Map<URI, List<Path>> namenodes, Configuration conf)
       throws IOException, InterruptedException {
-    final long sleeptime = 2000 * conf.getLong(
-        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
-        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+    final long sleeptime =
+        conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+            DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 +
+        conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
+            DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
     LOG.info("namenodes = " + namenodes);
     
     List<NameNodeConnector> connectors = Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
index da7306c..158c225 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
@@ -256,7 +256,7 @@ public class TestBlockStoragePolicy {
 
     final short replication = 3;
     {
-      final List<StorageType> chosen = Arrays.asList(); 
+      final List<StorageType> chosen = Lists.newArrayList();
       method.checkChooseStorageTypes(hot, replication, chosen,
           StorageType.DISK, StorageType.DISK, StorageType.DISK);
       method.checkChooseStorageTypes(warm, replication, chosen,
@@ -393,7 +393,7 @@ public class TestBlockStoragePolicy {
     final EnumSet<StorageType> unavailables = disk;
     final boolean isNewBlock = true;
     {
-      final List<StorageType> chosen = Arrays.asList(); 
+      final List<StorageType> chosen = Lists.newArrayList();
       checkChooseStorageTypes(hot, replication, chosen, unavailables, 
isNewBlock);
       checkChooseStorageTypes(warm, replication, chosen, unavailables, 
isNewBlock,
           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
@@ -500,7 +500,7 @@ public class TestBlockStoragePolicy {
 
     final short replication = 3;
     {
-      final List<StorageType> chosen = Arrays.asList(); 
+      final List<StorageType> chosen = Lists.newArrayList();
       method.checkChooseStorageTypes(hot, replication, chosen,
           StorageType.DISK, StorageType.DISK, StorageType.DISK);
       method.checkChooseStorageTypes(warm, replication, chosen,
@@ -603,7 +603,7 @@ public class TestBlockStoragePolicy {
     final EnumSet<StorageType> unavailables = disk;
     final boolean isNewBlock = false;
     {
-      final List<StorageType> chosen = Arrays.asList(); 
+      final List<StorageType> chosen = Lists.newArrayList();
       checkChooseStorageTypes(hot, replication, chosen, unavailables, 
isNewBlock,
           StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE);
       checkChooseStorageTypes(warm, replication, chosen, unavailables, 
isNewBlock,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d85f7e5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
index fda744f..d5d5cab 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java
@@ -44,9 +44,13 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
 import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.log4j.Level;
@@ -66,6 +70,8 @@ public class TestStorageMover {
         ).getLogger().setLevel(Level.ALL);
     ((Log4JLogger)LogFactory.getLog(Dispatcher.class)
         ).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LogFactory.getLog(DataTransferProtocol.class)).getLogger()
+        .setLevel(Level.ALL);
   }
 
   private static final int BLOCK_SIZE = 1024;
@@ -80,6 +86,8 @@ public class TestStorageMover {
   static {
     DEFAULT_CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     DEFAULT_CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    DEFAULT_CONF.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
+        2L);
     DEFAULT_CONF.setLong(DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, 2000L);
 
     DEFAULT_POLICIES = BlockStoragePolicy.readBlockStorageSuite(DEFAULT_CONF);
@@ -87,6 +95,9 @@ public class TestStorageMover {
     WARM = DEFAULT_POLICIES.getPolicy("WARM");
     COLD = DEFAULT_POLICIES.getPolicy("COLD");
     Dispatcher.setBlockMoveWaitTime(1000L);
+    Dispatcher.setDelayAfterErrors(1000L);
+    // do not create id file since we will eat up all the disk space
+    NameNodeConnector.setCreateIdFile(false);
   }
 
   /**
@@ -151,7 +162,7 @@ public class TestStorageMover {
 
     ClusterScheme() {
       this(DEFAULT_CONF, NUM_DATANODES, REPL,
-          genStorageTypes(NUM_DATANODES, 1, 1), null);
+          genStorageTypes(NUM_DATANODES), null);
     }
 
     ClusterScheme(Configuration conf, int numDataNodes, short repl,
@@ -195,7 +206,7 @@ public class TestStorageMover {
       dfs = cluster.getFileSystem();
     }
 
-    private void runBasicTest(boolean shotdown) throws Exception {
+    private void runBasicTest(boolean shutdown) throws Exception {
       setupCluster();
       try {
         prepareNamespace();
@@ -205,7 +216,7 @@ public class TestStorageMover {
         migrate();
         verify(true);
       } finally {
-        if (shotdown) {
+        if (shutdown) {
           shutdownCluster();
         }
       }
@@ -233,7 +244,7 @@ public class TestStorageMover {
     /**
      * Run the migration tool.
      */
-    void migrate(String... args) throws Exception {
+    void migrate() throws Exception {
       runMover();
       Thread.sleep(5000); // let the NN finish deletion
     }
@@ -242,6 +253,9 @@ public class TestStorageMover {
      * Verify block locations after running the migration tool.
      */
     void verify(boolean verifyAll) throws Exception {
+      for (DataNode dn : cluster.getDataNodes()) {
+        DataNodeTestUtils.triggerBlockReport(dn);
+      }
       if (verifyAll) {
         verifyNamespace();
       } else {
@@ -308,7 +322,8 @@ public class TestStorageMover {
         final Mover.StorageTypeDiff diff = new Mover.StorageTypeDiff(types,
             lb.getStorageTypes());
         Assert.assertTrue(fileStatus.getFullName(parent.toString())
-            + " with policy " + policy + " has non-empty overlap: " + diff,
+            + " with policy " + policy + " has non-empty overlap: " + diff
+            + ", the corresponding block is " + lb.getBlock().getLocalBlock(),
             diff.removeOverlap());
       }
     }
@@ -378,6 +393,7 @@ public class TestStorageMover {
       return "[disk=" + disk + ", archive=" + archive + "]";
     }
   }
+
   private static StorageType[][] genStorageTypes(int numDataNodes) {
     return genStorageTypes(numDataNodes, 0, 0);
   }
@@ -414,21 +430,6 @@ public class TestStorageMover {
     return capacities;
   }
 
-  /**
-   * A normal case for Mover: move a file into archival storage
-   */
-  @Test
-  public void testMigrateFileToArchival() throws Exception {
-    final Path foo = new Path("/foo");
-    Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
-    policyMap.put(foo, COLD);
-    NamespaceScheme nsScheme = new NamespaceScheme(null, Arrays.asList(foo),
-        2*BLOCK_SIZE, null, policyMap);
-    ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
-        NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
-    new MigrationTest(clusterScheme, nsScheme).runBasicTest(true);
-  }
-
   private static class PathPolicyMap {
     final Map<Path, BlockStoragePolicy> map = Maps.newHashMap();
     final Path hot = new Path("/hot");
@@ -447,13 +448,13 @@ public class TestStorageMover {
         }
       }
     }
-    
+
     NamespaceScheme newNamespaceScheme() {
       return new NamespaceScheme(Arrays.asList(hot, warm, cold),
           files, BLOCK_SIZE/2, null, map);
     }
-    
-    /** 
+
+    /**
      * Move hot files to warm and cold, warm files to hot and cold,
      * and cold files to hot and warm.
      */
@@ -473,21 +474,41 @@ public class TestStorageMover {
   }
 
   /**
+   * A normal case for Mover: move a file into archival storage
+   */
+  @Test
+  public void testMigrateFileToArchival() throws Exception {
+    LOG.info("testMigrateFileToArchival");
+    final Path foo = new Path("/foo");
+    Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
+    policyMap.put(foo, COLD);
+    NamespaceScheme nsScheme = new NamespaceScheme(null, Arrays.asList(foo),
+        2*BLOCK_SIZE, null, policyMap);
+    ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
+        NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
+    new MigrationTest(clusterScheme, nsScheme).runBasicTest(true);
+  }
+
+  /**
    * Test directories with Hot, Warm and Cold polices.
    */
   @Test
   public void testHotWarmColdDirs() throws Exception {
+    LOG.info("testHotWarmColdDirs");
     PathPolicyMap pathPolicyMap = new PathPolicyMap(3);
     NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
     ClusterScheme clusterScheme = new ClusterScheme();
     MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
 
-    test.runBasicTest(false);
+    try {
+      test.runBasicTest(false);
+      pathPolicyMap.moveAround(test.dfs);
+      test.migrate();
 
-    pathPolicyMap.moveAround(test.dfs);
-    test.migrate();
-    test.verify(true);
-    test.shutdownCluster();
+      test.verify(true);
+    } finally {
+      test.shutdownCluster();
+    }
   }
 
   /**
@@ -495,76 +516,81 @@ public class TestStorageMover {
    */
   @Test
   public void testNoSpaceDisk() throws Exception {
+    LOG.info("testNoSpaceDisk");
     final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
     final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
 
-    final long diskCapacity = (10 + 
HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE;
-    final long archiveCapacity = 100*BLOCK_SIZE;
+    final long diskCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)
+        * BLOCK_SIZE;
+    final long archiveCapacity = 100 * BLOCK_SIZE;
     final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
         diskCapacity, archiveCapacity);
-    final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
+    Configuration conf = new Configuration(DEFAULT_CONF);
+    final ClusterScheme clusterScheme = new ClusterScheme(conf,
         NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities);
     final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
 
-    test.runBasicTest(false);
-
-    // create hot files with replication 3 until not more spaces.
-    final short replication = 3;
-    {
-      int hotFileCount = 0;
-      try {
-        for(; ; hotFileCount++) {
-          final Path p = new Path(pathPolicyMap.hot, "file" + hotFileCount);
-          DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
+    try {
+      test.runBasicTest(false);
+
+      // create hot files with replication 3 until not more spaces.
+      final short replication = 3;
+      {
+        int hotFileCount = 0;
+        try {
+          for (; ; hotFileCount++) {
+            final Path p = new Path(pathPolicyMap.hot, "file" + hotFileCount);
+            DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
+          }
+        } catch (IOException e) {
+          LOG.info("Expected: hotFileCount=" + hotFileCount, e);
         }
-      } catch(IOException e) {
-        LOG.info("Expected: hotFileCount=" + hotFileCount, e);
+        Assert.assertTrue(hotFileCount >= 1);
       }
-      Assert.assertTrue(hotFileCount >= 2);
-    }
 
-    // create hot files with replication 1 to use up all remaining spaces.
-    {
-      int hotFileCount_r1 = 0;
-      try {
-        for(; ; hotFileCount_r1++) {
-          final Path p = new Path(pathPolicyMap.hot, "file_r1_" + 
hotFileCount_r1);
-          DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)1, 0L);
+      // create hot files with replication 1 to use up all remaining spaces.
+      {
+        int hotFileCount_r1 = 0;
+        try {
+          for (; ; hotFileCount_r1++) {
+            final Path p = new Path(pathPolicyMap.hot, "file_r1_" + 
hotFileCount_r1);
+            DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 1, 0L);
+          }
+        } catch (IOException e) {
+          LOG.info("Expected: hotFileCount_r1=" + hotFileCount_r1, e);
         }
-      } catch(IOException e) {
-        LOG.info("Expected: hotFileCount_r1=" + hotFileCount_r1, e);
       }
-    }
 
-    { // test increasing replication.  Since DISK is full,
-      // new replicas should be stored in ARCHIVE as a fallback storage.
-      final Path file0 = new Path(pathPolicyMap.hot, "file0");
-      final Replication r = test.getReplication(file0);
-      final short newReplication = (short)5;
-      test.dfs.setReplication(file0, newReplication);
-      Thread.sleep(10000);
-      test.verifyReplication(file0, r.disk, newReplication - r.disk);
-    }
+      { // test increasing replication.  Since DISK is full,
+        // new replicas should be stored in ARCHIVE as a fallback storage.
+        final Path file0 = new Path(pathPolicyMap.hot, "file0");
+        final Replication r = test.getReplication(file0);
+        final short newReplication = (short) 5;
+        test.dfs.setReplication(file0, newReplication);
+        Thread.sleep(10000);
+        test.verifyReplication(file0, r.disk, newReplication - r.disk);
+      }
 
-    { // test creating a cold file and then increase replication
-      final Path p = new Path(pathPolicyMap.cold, "foo");
-      DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
-      test.verifyReplication(p, 0, replication);
+      { // test creating a cold file and then increase replication
+        final Path p = new Path(pathPolicyMap.cold, "foo");
+        DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
+        test.verifyReplication(p, 0, replication);
 
-      final short newReplication = 5;
-      test.dfs.setReplication(p, newReplication);
-      Thread.sleep(10000);
-      test.verifyReplication(p, 0, newReplication);
-    }
+        final short newReplication = 5;
+        test.dfs.setReplication(p, newReplication);
+        Thread.sleep(10000);
+        test.verifyReplication(p, 0, newReplication);
+      }
 
-    { //test move a hot file to warm
-      final Path file1 = new Path(pathPolicyMap.hot, "file1");
-      test.dfs.rename(file1, pathPolicyMap.warm);
-      test.migrate();
-      test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());;
+      { //test move a hot file to warm
+        final Path file1 = new Path(pathPolicyMap.hot, "file1");
+        test.dfs.rename(file1, pathPolicyMap.warm);
+        test.migrate();
+        test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());
+      }
+    } finally {
+      test.shutdownCluster();
     }
-
-    test.shutdownCluster();
   }
 
   /**
@@ -572,73 +598,77 @@ public class TestStorageMover {
    */
   @Test
   public void testNoSpaceArchive() throws Exception {
+    LOG.info("testNoSpaceArchive");
     final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
     final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
 
-    final long diskCapacity = 100*BLOCK_SIZE;
-    final long archiveCapacity = (10 + 
HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE;
+    final long diskCapacity = 100 * BLOCK_SIZE;
+    final long archiveCapacity = (6 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)
+        * BLOCK_SIZE;
     final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
         diskCapacity, archiveCapacity);
     final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
         NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES, 1, 1), capacities);
     final MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
 
-    test.runBasicTest(false);
-
-    // create cold files with replication 3 until not more spaces.
-    final short replication = 3;
-    {
-      int coldFileCount = 0;
-      try {
-        for(; ; coldFileCount++) {
-          final Path p = new Path(pathPolicyMap.cold, "file" + coldFileCount);
-          DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
+    try {
+      test.runBasicTest(false);
+
+      // create cold files with replication 3 until not more spaces.
+      final short replication = 3;
+      {
+        int coldFileCount = 0;
+        try {
+          for (; ; coldFileCount++) {
+            final Path p = new Path(pathPolicyMap.cold, "file" + 
coldFileCount);
+            DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, replication, 0L);
+          }
+        } catch (IOException e) {
+          LOG.info("Expected: coldFileCount=" + coldFileCount, e);
         }
-      } catch(IOException e) {
-        LOG.info("Expected: coldFileCount=" + coldFileCount, e);
+        Assert.assertTrue(coldFileCount >= 1);
       }
-      Assert.assertTrue(coldFileCount >= 2);
-    }
 
-    // create cold files with replication 1 to use up all remaining spaces.
-    {
-      int coldFileCount_r1 = 0;
-      try {
-        for(; ; coldFileCount_r1++) {
-          final Path p = new Path(pathPolicyMap.cold, "file_r1_" + 
coldFileCount_r1);
-          DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)1, 0L);
+      // create cold files with replication 1 to use up all remaining spaces.
+      {
+        int coldFileCount_r1 = 0;
+        try {
+          for (; ; coldFileCount_r1++) {
+            final Path p = new Path(pathPolicyMap.cold, "file_r1_" + 
coldFileCount_r1);
+            DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 1, 0L);
+          }
+        } catch (IOException e) {
+          LOG.info("Expected: coldFileCount_r1=" + coldFileCount_r1, e);
         }
-      } catch(IOException e) {
-        LOG.info("Expected: coldFileCount_r1=" + coldFileCount_r1, e);
       }
-    }
 
-    { // test increasing replication but new replicas cannot be created
-      // since no more ARCHIVE space.
-      final Path file0 = new Path(pathPolicyMap.cold, "file0");
-      final Replication r = test.getReplication(file0);
-      LOG.info("XXX " + file0 + ": replication=" + r);
-      Assert.assertEquals(0, r.disk);
+      { // test increasing replication but new replicas cannot be created
+        // since no more ARCHIVE space.
+        final Path file0 = new Path(pathPolicyMap.cold, "file0");
+        final Replication r = test.getReplication(file0);
+        LOG.info("XXX " + file0 + ": replication=" + r);
+        Assert.assertEquals(0, r.disk);
 
-      final short newReplication = (short)5;
-      test.dfs.setReplication(file0, newReplication);
-      Thread.sleep(10000);
+        final short newReplication = (short) 5;
+        test.dfs.setReplication(file0, newReplication);
+        Thread.sleep(10000);
 
-      test.verifyReplication(file0, 0, r.archive);
-    }
+        test.verifyReplication(file0, 0, r.archive);
+      }
 
-    { // test creating a hot file
-      final Path p = new Path(pathPolicyMap.hot, "foo");
-      DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short)3, 0L);
-    }
+      { // test creating a hot file
+        final Path p = new Path(pathPolicyMap.hot, "foo");
+        DFSTestUtil.createFile(test.dfs, p, BLOCK_SIZE, (short) 3, 0L);
+      }
 
-    { //test move a cold file to warm
-      final Path file1 = new Path(pathPolicyMap.hot, "file1");
-      test.dfs.rename(file1, pathPolicyMap.warm);
-      test.migrate();
-      test.verify(true);
+      { //test move a cold file to warm
+        final Path file1 = new Path(pathPolicyMap.cold, "file1");
+        test.dfs.rename(file1, pathPolicyMap.warm);
+        test.migrate();
+        test.verify(true);
+      }
+    } finally {
+      test.shutdownCluster();
     }
-
-    test.shutdownCluster();
   }
 }

Reply via email to