HDFS-6911. Archival Storage: check if a block is already scheduled in Mover. 
Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8ea20b53
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8ea20b53
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8ea20b53

Branch: refs/heads/HDFS-6581
Commit: 8ea20b53a861a2771c206afaacf8e7783568c4b1
Parents: 555900a
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Aug 27 10:38:10 2014 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Aug 27 10:38:10 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 50 +++++++-----
 .../hadoop/hdfs/server/balancer/Matcher.java    | 15 ++++
 .../apache/hadoop/hdfs/server/mover/Mover.java  | 65 ++++++++-------
 .../hadoop/hdfs/TestBlockStoragePolicy.java     | 79 +++++++++++++-----
 .../hadoop/hdfs/server/mover/TestMover.java     | 85 ++++++++++++++++++++
 5 files changed, 228 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ea20b53/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index bb498cc..41ea1f3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -175,11 +175,7 @@ public class Dispatcher {
     private DDatanode proxySource;
     private StorageGroup target;
 
-    private PendingMove() {
-    }
-
-    public PendingMove(DBlock block, Source source, StorageGroup target) {
-      this.block = block;
+    private PendingMove(Source source, StorageGroup target) {
       this.source = source;
       this.target = target;
     }
@@ -199,9 +195,11 @@ public class Dispatcher {
      * @return true if a block and its proxy are chosen; false otherwise
      */
     private boolean chooseBlockAndProxy() {
+      // source and target must have the same storage type
+      final StorageType t = source.getStorageType();
       // iterate all source's blocks until find a good one
       for (Iterator<DBlock> i = source.getBlockIterator(); i.hasNext();) {
-        if (markMovedIfGoodBlock(i.next())) {
+        if (markMovedIfGoodBlock(i.next(), t)) {
           i.remove();
           return true;
         }
@@ -212,10 +210,10 @@ public class Dispatcher {
     /**
      * @return true if the given block is good for the tentative move.
      */
-    private boolean markMovedIfGoodBlock(DBlock block) {
+    private boolean markMovedIfGoodBlock(DBlock block, StorageType 
targetStorageType) {
       synchronized (block) {
         synchronized (movedBlocks) {
-          if (isGoodBlockCandidate(source, target, block)) {
+          if (isGoodBlockCandidate(source, target, targetStorageType, block)) {
             this.block = block;
             if (chooseProxySource()) {
               movedBlocks.put(block);
@@ -235,7 +233,7 @@ public class Dispatcher {
      * 
      * @return true if a proxy is found; otherwise false
      */
-    public boolean chooseProxySource() {
+    private boolean chooseProxySource() {
       final DatanodeInfo targetDN = target.getDatanodeInfo();
       // if node group is supported, first try add nodes in the same node group
       if (cluster.isNodeGroupAware()) {
@@ -440,6 +438,18 @@ public class Dispatcher {
         scheduledSize = 0L;
       }
 
+      private PendingMove addPendingMove(DBlock block, final PendingMove pm) {
+        if (getDDatanode().addPendingBlock(pm)) {
+          if (pm.markMovedIfGoodBlock(block, getStorageType())) {
+            incScheduledSize(pm.block.getNumBytes());
+            return pm;
+          } else {
+            getDDatanode().removePendingBlock(pm);
+          }
+        }
+        return null;
+      }
+
       /** @return the name for display */
       String getDisplayName() {
         return datanode + ":" + storageType;
@@ -599,8 +609,11 @@ public class Dispatcher {
 
     /** Decide if the given block is a good candidate to move or not */
     private boolean isGoodBlockCandidate(DBlock block) {
+      // source and target must have the same storage type
+      final StorageType sourceStorageType = getStorageType();
       for (Task t : tasks) {
-        if (Dispatcher.this.isGoodBlockCandidate(this, t.target, block)) {
+        if (Dispatcher.this.isGoodBlockCandidate(this, t.target,
+            sourceStorageType, block)) {
           return true;
         }
       }
@@ -620,11 +633,9 @@ public class Dispatcher {
       for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
         final Task task = i.next();
         final DDatanode target = task.target.getDDatanode();
-        PendingMove pendingBlock = new PendingMove();
+        final PendingMove pendingBlock = new PendingMove(this, task.target);
         if (target.addPendingBlock(pendingBlock)) {
           // target is not busy, so do a tentative block allocation
-          pendingBlock.source = this;
-          pendingBlock.target = task.target;
           if (pendingBlock.chooseBlockAndProxy()) {
             long blockSize = pendingBlock.block.getNumBytes();
             incScheduledSize(-blockSize);
@@ -641,6 +652,11 @@ public class Dispatcher {
       }
       return null;
     }
+    
+    /** Add a pending move */
+    public PendingMove addPendingMove(DBlock block, StorageGroup target) {
+      return target.addPendingMove(block, new PendingMove(this, target));
+    }
 
     /** Iterate all source's blocks to remove moved ones */
     private void removeMovedBlocks() {
@@ -901,12 +917,6 @@ public class Dispatcher {
     }
   }
 
-  private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup 
target,
-      DBlock block) {
-    // match source and target storage type
-    return isGoodBlockCandidate(source, target, source.getStorageType(), 
block);
-  }
-
   /**
    * Decide if the block is a good candidate to be moved from source to target.
    * A block is a good candidate if 
@@ -914,7 +924,7 @@ public class Dispatcher {
    * 2. the block does not have a replica on the target;
    * 3. doing the move does not reduce the number of racks that the block has
    */
-  public boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
+  private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup 
target,
       StorageType targetStorageType, DBlock block) {
     if (target.storageType != targetStorageType) {
       return false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ea20b53/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java
index 54febc6..f8d0071 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java
@@ -31,6 +31,11 @@ public interface Matcher {
     public boolean match(NetworkTopology cluster, Node left, Node right) {
       return cluster.isOnSameNodeGroup(left, right);
     }
+
+    @Override
+    public String toString() {
+      return "SAME_NODE_GROUP";
+    }
   };
 
   /** Match datanodes in the same rack. */
@@ -39,6 +44,11 @@ public interface Matcher {
     public boolean match(NetworkTopology cluster, Node left, Node right) {
       return cluster.isOnSameRack(left, right);
     }
+
+    @Override
+    public String toString() {
+      return "SAME_RACK";
+    }
   };
 
   /** Match any datanode with any other datanode. */
@@ -47,5 +57,10 @@ public interface Matcher {
     public boolean match(NetworkTopology cluster, Node left, Node right) {
       return left != right;
     }
+
+    @Override
+    public String toString() {
+      return "ANY_OTHER";
+    }
   };
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ea20b53/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 424998c..4dbe1d3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -50,7 +50,7 @@ import java.util.*;
 public class Mover {
   static final Log LOG = LogFactory.getLog(Mover.class);
 
-  private static final Path MOVER_ID_PATH = new Path("/system/mover.id");
+  static final Path MOVER_ID_PATH = new Path("/system/mover.id");
 
   private static class StorageMap {
     private final StorageGroupMap<Source> sources
@@ -111,22 +111,25 @@ public class Mover {
     this.storages = new StorageMap();
     this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf);
   }
-  
-  private ExitStatus run() {
-    try {
-      final List<DatanodeStorageReport> reports = dispatcher.init();
-      for(DatanodeStorageReport r : reports) {
-        final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
-        for(StorageType t : StorageType.asList()) {
-          final long maxRemaining = getMaxRemaining(r, t);
-          if (maxRemaining > 0L) {
-            final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher); 
-            final StorageGroup target = dn.addTarget(t, maxRemaining);
-            storages.add(source, target);
-          }
+
+  void init() throws IOException {
+    final List<DatanodeStorageReport> reports = dispatcher.init();
+    for(DatanodeStorageReport r : reports) {
+      final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
+      for(StorageType t : StorageType.asList()) {
+        final long maxRemaining = getMaxRemaining(r, t);
+        if (maxRemaining > 0L) {
+          final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher); 
+          final StorageGroup target = dn.addTarget(t, maxRemaining);
+          storages.add(source, target);
         }
       }
+    }
+  }
 
+  private ExitStatus run() {
+    try {
+      init();
       new Processor().processNamespace();
 
       return ExitStatus.IN_PROGRESS;
@@ -141,6 +144,14 @@ public class Mover {
     }
   }
 
+  DBlock newDBlock(Block block, List<MLocation> locations) {
+    final DBlock db = new DBlock(block);
+    for(MLocation ml : locations) {
+      db.addLocation(storages.getTarget(ml));
+    }
+    return db;
+  }
+
   private static long getMaxRemaining(DatanodeStorageReport report, 
StorageType t) {
     long max = 0L;
     for(StorageReport r : report.getStorageReports()) {
@@ -169,11 +180,11 @@ public class Mover {
     return sb.toString();
   }
 
-  private class Processor {
+  class Processor {
     private final DFSClient dfs;
     private final List<String> snapshottableDirs = new ArrayList<String>();
 
-    private Processor() {
+    Processor() {
       dfs = dispatcher.getDistributedFileSystem().getClient();
     }
 
@@ -290,15 +301,11 @@ public class Mover {
         }
       }
     }
-    
+
     void scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
       final List<MLocation> locations = MLocation.toLocations(lb);
       Collections.shuffle(locations);
-      
-      final DBlock db = new DBlock(lb.getBlock().getLocalBlock());
-      for(MLocation ml : locations) {
-        db.addLocation(storages.getTarget(ml));
-      }
+      final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations);
 
       for(final Iterator<StorageType> i = diff.existing.iterator(); 
i.hasNext(); ) {
         final StorageType t = i.next();
@@ -310,12 +317,18 @@ public class Mover {
             if (scheduleMoveReplica(db, ml, source, diff.expected)) {
               i.remove();
               j.remove();
+              return;
             }
           }
         }
       }
     }
 
+    boolean scheduleMoveReplica(DBlock db, MLocation ml,
+        List<StorageType> targetTypes) {
+      return scheduleMoveReplica(db, ml, storages.getSource(ml), targetTypes);
+    }
+
     boolean scheduleMoveReplica(DBlock db, MLocation ml, Source source,
         List<StorageType> targetTypes) {
       if (dispatcher.getCluster().isNodeGroupAware()) {
@@ -341,12 +354,10 @@ public class Mover {
       for(final Iterator<StorageType> i = targetTypes.iterator(); i.hasNext(); 
) {
         final StorageType t = i.next();
         for(StorageGroup target : storages.getTargetStorages(t)) {
-          if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo())
-              && dispatcher.isGoodBlockCandidate(source, target, t, db)) {
-            final PendingMove pm = dispatcher.new PendingMove(db, source, 
target);
-            if (pm.chooseProxySource()) {
+          if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo())) {
+            final PendingMove pm = source.addPendingMove(db, target);
+            if (pm != null) {
               i.remove();
-              target.incScheduledSize(ml.size);
               dispatcher.executePendingMove(pm);
               return true;
             }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ea20b53/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
index f2c9fb1..ff5e995 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
 import static org.apache.hadoop.hdfs.BlockStoragePolicy.ID_UNSPECIFIED;
 
 import java.io.FileNotFoundException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -62,20 +61,6 @@ public class TestBlockStoragePolicy {
   static final byte WARM = (byte) 8;
   static final byte HOT  = (byte) 12;
 
-  static final List<List<StorageType>> chosens = new 
ArrayList<List<StorageType>>();
-  static {
-    chosens.add(Arrays.<StorageType>asList());
-    chosens.add(Arrays.asList(StorageType.DISK));
-    chosens.add(Arrays.asList(StorageType.ARCHIVE));
-    chosens.add(Arrays.asList(StorageType.DISK, StorageType.DISK));
-    chosens.add(Arrays.asList(StorageType.DISK, StorageType.ARCHIVE));
-    chosens.add(Arrays.asList(StorageType.ARCHIVE, StorageType.ARCHIVE));
-    chosens.add(Arrays.asList(StorageType.DISK, StorageType.DISK, 
StorageType.DISK));
-    chosens.add(Arrays.asList(StorageType.DISK, StorageType.DISK, 
StorageType.ARCHIVE));
-    chosens.add(Arrays.asList(StorageType.DISK, StorageType.ARCHIVE, 
StorageType.ARCHIVE));
-    chosens.add(Arrays.asList(StorageType.ARCHIVE, StorageType.ARCHIVE, 
StorageType.ARCHIVE));
-  }
-
   @Test
   public void testDefaultPolicies() {
     final Map<Byte, String> expectedPolicyStrings = new HashMap<Byte, 
String>();
@@ -126,6 +111,17 @@ public class TestBlockStoragePolicy {
     }
   }
 
+  static StorageType[] newStorageTypes(int nDisk, int nArchive) {
+    final StorageType[] t = new StorageType[nDisk + nArchive];
+    Arrays.fill(t, 0, nDisk, StorageType.DISK);
+    Arrays.fill(t, nDisk, t.length, StorageType.ARCHIVE);
+    return t;
+  }
+
+  static List<StorageType> asList(int nDisk, int nArchive) {
+    return Arrays.asList(newStorageTypes(nDisk, nArchive));
+  }
+
   static void assertStorageType(List<StorageType> computed, short replication,
       StorageType... answers) {
     Assert.assertEquals(replication, computed.size());
@@ -369,10 +365,14 @@ public class TestBlockStoragePolicy {
     final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
 
     final short replication = 3;
-    for(List<StorageType> c : chosens) {
-      method.checkChooseStorageTypes(hot, replication, c);
-      method.checkChooseStorageTypes(warm, replication, c);
-      method.checkChooseStorageTypes(cold, replication, c);
+    for(int n = 0; n <= 3; n++) {
+      for(int d = 0; d <= n; d++) {
+        final int a = n - d;
+        final List<StorageType> chosen = asList(d, a);
+        method.checkChooseStorageTypes(hot, replication, chosen);
+        method.checkChooseStorageTypes(warm, replication, chosen);
+        method.checkChooseStorageTypes(cold, replication, chosen);
+      }
     }
   }
 
@@ -714,6 +714,47 @@ public class TestBlockStoragePolicy {
     Assert.assertArrayEquals(expected, computed);
   }
 
+  @Test
+  public void testChooseExcess() {
+    final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
+    final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
+    final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
+
+    final short replication = 3;
+    for(int n = 0; n <= 6; n++) {
+      for(int d = 0; d <= n; d++) {
+        final int a = n - d;
+        final List<StorageType> chosen = asList(d, a);
+        {
+          final int nDisk = Math.max(0, d - replication); 
+          final int nArchive = a;
+          final StorageType[] expected = newStorageTypes(nDisk, nArchive);
+          checkChooseExcess(hot, replication, chosen, expected);
+        }
+
+        {
+          final int nDisk = Math.max(0, d - 1); 
+          final int nArchive = Math.max(0, a - replication + 1);
+          final StorageType[] expected = newStorageTypes(nDisk, nArchive);
+          checkChooseExcess(warm, replication, chosen, expected);
+        }
+
+        {
+          final int nDisk = d; 
+          final int nArchive = Math.max(0, a - replication );
+          final StorageType[] expected = newStorageTypes(nDisk, nArchive);
+          checkChooseExcess(cold, replication, chosen, expected);
+        }
+      }
+    }
+  }
+
+  static void checkChooseExcess(BlockStoragePolicy p, short replication,
+      List<StorageType> chosen, StorageType... expected) {
+    final List<StorageType> types = p.chooseExcess(replication, chosen);
+    assertStorageTypes(types, expected);
+  }
+
   private void checkDirectoryListing(HdfsFileStatus[] stats, byte... policies) 
{
     Assert.assertEquals(stats.length, policies.length);
     for (int i = 0; i < stats.length; i++) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8ea20b53/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
new file mode 100644
index 0000000..da913e7
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.mover;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestMover {
+  static Mover newMover(Configuration conf) throws IOException {
+    final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+    Assert.assertEquals(1, namenodes.size());
+
+    final List<NameNodeConnector> nncs = 
NameNodeConnector.newNameNodeConnectors(
+        namenodes, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf);
+    return new Mover(nncs.get(0), conf);
+  }
+
+  @Test
+  public void testScheduleSameBlock() throws IOException {
+    final Configuration conf = new HdfsConfiguration();
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(4).build();
+    try {
+      cluster.waitActive();
+      final DistributedFileSystem dfs = cluster.getFileSystem();
+      final String file = "/testScheduleSameBlock/file";
+      
+      {
+        final FSDataOutputStream out = dfs.create(new Path(file));
+        out.writeChars("testScheduleSameBlock");
+        out.close();
+      }
+
+      final Mover mover = newMover(conf);
+      mover.init();
+      final Mover.Processor processor = mover.new Processor();
+
+      final LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+      final List<MLocation> locations = MLocation.toLocations(lb);
+      final MLocation ml = locations.get(0);
+      final DBlock db = mover.newDBlock(lb.getBlock().getLocalBlock(), 
locations);
+
+      final List<StorageType> storageTypes = new ArrayList<StorageType>(
+          Arrays.asList(StorageType.DEFAULT, StorageType.DEFAULT));
+      Assert.assertTrue(processor.scheduleMoveReplica(db, ml, storageTypes));
+      Assert.assertFalse(processor.scheduleMoveReplica(db, ml, storageTypes));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}

Reply via email to