HDFS-6961. Archival Storage: BlockPlacementPolicy#chooseTarget should check 
each valid storage type in each choosing round.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e08701ec
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e08701ec
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e08701ec

Branch: refs/heads/trunk
Commit: e08701ec71f7c10d8f15122d90c35f9f22e40837
Parents: 45d5b13
Author: Jing Zhao <ji...@apache.org>
Authored: Thu Sep 4 14:19:32 2014 -0700
Committer: Jing Zhao <ji...@apache.org>
Committed: Thu Sep 4 14:19:32 2014 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/BlockStoragePolicy.java  |   4 +-
 .../server/blockmanagement/BlockManager.java    |   2 +-
 .../BlockPlacementPolicyDefault.java            | 190 +++++++++++--------
 .../BlockPlacementPolicyWithNodeGroup.java      |  73 +++----
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  27 ++-
 .../hadoop/hdfs/TestBlockStoragePolicy.java     |  71 +++++--
 6 files changed, 240 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e08701ec/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
index b119359..a72d4f1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java
@@ -23,6 +23,7 @@ import java.util.EnumSet;
 import java.util.LinkedList;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -99,7 +100,8 @@ public class BlockStoragePolicy {
   /** The fallback storage type for replication. */
   private final StorageType[] replicationFallbacks;
 
-  BlockStoragePolicy(byte id, String name, StorageType[] storageTypes,
+  @VisibleForTesting
+  public BlockStoragePolicy(byte id, String name, StorageType[] storageTypes,
       StorageType[] creationFallbacks, StorageType[] replicationFallbacks) {
     this.id = id;
     this.name = name;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e08701ec/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 389409b..af83653 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -453,7 +453,7 @@ public class BlockManager {
   }
 
   @VisibleForTesting
-  BlockPlacementPolicy getBlockPlacementPolicy() {
+  public BlockPlacementPolicy getBlockPlacementPolicy() {
     return blockplacement;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e08701ec/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index a711a09..593ea90 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -19,13 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import static org.apache.hadoop.util.Time.now;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.*;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -142,8 +136,10 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
 
       Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
           new HashSet<Node>() : new HashSet<Node>(excludedNodes);
-      final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes(
-          (short)numOfReplicas);
+      final List<StorageType> requiredStorageTypes = storagePolicy
+          .chooseStorageTypes((short)numOfReplicas);
+      final EnumMap<StorageType, Integer> storageTypes =
+          getRequiredStorageTypes(requiredStorageTypes);
 
       // Choose favored nodes
       List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
@@ -156,13 +152,12 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
         final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
             favoriteAndExcludedNodes, blocksize, 
             getMaxNodesPerRack(results.size(), numOfReplicas)[1],
-            results, avoidStaleNodes, storageTypes.get(0), false);
+            results, avoidStaleNodes, storageTypes, false);
         if (target == null) {
           LOG.warn("Could not find a target for file " + src
               + " with favored node " + favoredNode); 
           continue;
         }
-        storageTypes.remove(0);
         favoriteAndExcludedNodes.add(target.getDatanodeDescriptor());
       }
 
@@ -241,6 +236,21 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
     return new int[] {numOfReplicas, maxNodesPerRack};
   }
 
+  private EnumMap<StorageType, Integer> getRequiredStorageTypes(
+      List<StorageType> types) {
+    EnumMap<StorageType, Integer> map = new EnumMap<StorageType,
+        Integer>(StorageType.class);
+    for (StorageType type : types) {
+      if (!map.containsKey(type)) {
+        map.put(type, 1);
+      } else {
+        int num = map.get(type);
+        map.put(type, num + 1);
+      }
+    }
+    return map;
+  }
+
   /**
    * choose <i>numOfReplicas</i> from all data nodes
    * @param numOfReplicas additional number of replicas wanted
@@ -272,17 +282,18 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
     }
 
     // Keep a copy of original excludedNodes
-    final Set<Node> oldExcludedNodes = avoidStaleNodes ? 
-        new HashSet<Node>(excludedNodes) : null;
+    final Set<Node> oldExcludedNodes = new HashSet<Node>(excludedNodes);
 
     // choose storage types; use fallbacks for unavailable storages
-    final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes(
-        (short)totalReplicasExpected, 
DatanodeStorageInfo.toStorageTypes(results),
-        unavailableStorages, newBlock);
+    final List<StorageType> requiredStorageTypes = storagePolicy
+        .chooseStorageTypes((short) totalReplicasExpected,
+            DatanodeStorageInfo.toStorageTypes(results),
+            unavailableStorages, newBlock);
+    final EnumMap<StorageType, Integer> storageTypes =
+        getRequiredStorageTypes(requiredStorageTypes);
 
-    StorageType curStorageType = null;
     try {
-      if ((numOfReplicas = storageTypes.size()) == 0) {
+      if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
         throw new NotEnoughReplicasException(
             "All required storage types are unavailable: "
             + " unavailableStorages=" + unavailableStorages
@@ -290,9 +301,8 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       }
 
       if (numOfResults == 0) {
-        curStorageType = storageTypes.remove(0);
         writer = chooseLocalStorage(writer, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes, curStorageType, true)
+            maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
                 .getDatanodeDescriptor();
         if (--numOfReplicas == 0) {
           return writer;
@@ -300,33 +310,30 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       }
       final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
       if (numOfResults <= 1) {
-        curStorageType = storageTypes.remove(0);
         chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
-            results, avoidStaleNodes, curStorageType);
+            results, avoidStaleNodes, storageTypes);
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
       if (numOfResults <= 2) {
         final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
-        curStorageType = storageTypes.remove(0);
         if (clusterMap.isOnSameRack(dn0, dn1)) {
           chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
-              results, avoidStaleNodes, curStorageType);
+              results, avoidStaleNodes, storageTypes);
         } else if (newBlock){
           chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
-              results, avoidStaleNodes, curStorageType);
+              results, avoidStaleNodes, storageTypes);
         } else {
           chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
-              results, avoidStaleNodes, curStorageType);
+              results, avoidStaleNodes, storageTypes);
         }
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
-      curStorageType = storageTypes.remove(0);
       chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes, curStorageType);
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     } catch (NotEnoughReplicasException e) {
       final String message = "Failed to place enough replicas, still in need 
of "
           + (totalReplicasExpected - results.size()) + " to reach "
@@ -355,10 +362,22 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
             newBlock);
       }
 
-      if (storageTypes.size() > 0) {
-        // Retry chooseTarget with fallback storage types
-        unavailableStorages.add(curStorageType);
-        return chooseTarget(numOfReplicas, writer, excludedNodes, blocksize,
+      boolean retry = false;
+      // simply add all the remaining types into unavailableStorages and give
+      // another try. No best effort is guaranteed here.
+      for (StorageType type : storageTypes.keySet()) {
+        if (!unavailableStorages.contains(type)) {
+          unavailableStorages.add(type);
+          retry = true;
+        }
+      }
+      if (retry) {
+        for (DatanodeStorageInfo resultStorage : results) {
+          addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
+              oldExcludedNodes);
+        }
+        numOfReplicas = totalReplicasExpected - results.size();
+        return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
             maxNodesPerRack, results, false, storagePolicy, 
unavailableStorages,
             newBlock);
       }
@@ -373,28 +392,35 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
    * @return the chosen storage
    */
   protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
-                                             Set<Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeStorageInfo> results,
-                                             boolean avoidStaleNodes,
-                                             StorageType storageType,
-                                             boolean fallbackToLocalRack)
+      Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+      EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
       throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes, storageType);
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     }
     if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
       DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
       // otherwise try local machine first
       if (excludedNodes.add(localMachine)) { // was not in the excluded list
-        for(DatanodeStorageInfo localStorage : DFSUtil.shuffle(
-            localDatanode.getStorageInfos())) {
-          if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
-              maxNodesPerRack, false, results, avoidStaleNodes, storageType) 
>= 0) {
-            return localStorage;
+        for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
+            .entrySet().iterator(); iter.hasNext(); ) {
+          Map.Entry<StorageType, Integer> entry = iter.next();
+          for (DatanodeStorageInfo localStorage : DFSUtil.shuffle(
+              localDatanode.getStorageInfos())) {
+            StorageType type = entry.getKey();
+            if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
+                maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) {
+              int num = entry.getValue();
+              if (num == 1) {
+                iter.remove();
+              } else {
+                entry.setValue(num - 1);
+              }
+              return localStorage;
+            }
           }
         }
       } 
@@ -405,7 +431,7 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
     }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, blocksize,
-        maxNodesPerRack, results, avoidStaleNodes, storageType);
+        maxNodesPerRack, results, avoidStaleNodes, storageTypes);
   }
   
   /**
@@ -428,23 +454,23 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
    * @return the chosen node
    */
   protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
-                                             Set<Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeStorageInfo> results,
-                                             boolean avoidStaleNodes,
-                                             StorageType storageType)
+                                                Set<Node> excludedNodes,
+                                                long blocksize,
+                                                int maxNodesPerRack,
+                                                List<DatanodeStorageInfo> 
results,
+                                                boolean avoidStaleNodes,
+                                                EnumMap<StorageType, Integer> 
storageTypes)
       throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes, storageType);
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     }
       
     // choose one from the local rack
     try {
       return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
-          blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+          blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -458,16 +484,17 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       if (newLocal != null) {
         try {
           return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
-              blocksize, maxNodesPerRack, results, avoidStaleNodes, 
storageType);
+              blocksize, maxNodesPerRack, results, avoidStaleNodes,
+              storageTypes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
           return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-              maxNodesPerRack, results, avoidStaleNodes, storageType);
+              maxNodesPerRack, results, avoidStaleNodes, storageTypes);
         }
       } else {
         //otherwise randomly choose one from the network
         return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes, storageType);
+            maxNodesPerRack, results, avoidStaleNodes, storageTypes);
       }
     }
   }
@@ -486,18 +513,18 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
                                 int maxReplicasPerRack,
                                 List<DatanodeStorageInfo> results,
                                 boolean avoidStaleNodes,
-                                StorageType storageType)
+                                EnumMap<StorageType, Integer> storageTypes)
                                     throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
       chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
           excludedNodes, blocksize, maxReplicasPerRack, results,
-          avoidStaleNodes, storageType);
+          avoidStaleNodes, storageTypes);
     } catch (NotEnoughReplicasException e) {
       chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
                    localMachine.getNetworkLocation(), excludedNodes, 
blocksize, 
-                   maxReplicasPerRack, results, avoidStaleNodes, storageType);
+                   maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
     }
   }
 
@@ -511,10 +538,10 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       int maxNodesPerRack,
       List<DatanodeStorageInfo> results,
       boolean avoidStaleNodes,
-      StorageType storageType)
+      EnumMap<StorageType, Integer> storageTypes)
           throws NotEnoughReplicasException {
     return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack,
-        results, avoidStaleNodes, storageType);
+        results, avoidStaleNodes, storageTypes);
   }
 
   /**
@@ -528,8 +555,8 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
                             int maxNodesPerRack,
                             List<DatanodeStorageInfo> results,
                             boolean avoidStaleNodes,
-                            StorageType storageType)
-                                throws NotEnoughReplicasException {
+                            EnumMap<StorageType, Integer> storageTypes)
+                            throws NotEnoughReplicasException {
       
     int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
         scope, excludedNodes);
@@ -549,18 +576,31 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
 
         final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
             chosenNode.getStorageInfos());
-        int i;
-        for(i = 0; i < storages.length; i++) {
-          final int newExcludedNodes = addIfIsGoodTarget(storages[i],
-              excludedNodes, blocksize, maxNodesPerRack, considerLoad, results,
-              avoidStaleNodes, storageType);
-          if (newExcludedNodes >= 0) {
-            numOfReplicas--;
-            if (firstChosen == null) {
-              firstChosen = storages[i];
+        int i = 0;
+        boolean search = true;
+        for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
+            .entrySet().iterator(); search && iter.hasNext(); ) {
+          Map.Entry<StorageType, Integer> entry = iter.next();
+          for (i = 0; i < storages.length; i++) {
+            StorageType type = entry.getKey();
+            final int newExcludedNodes = addIfIsGoodTarget(storages[i],
+                excludedNodes, blocksize, maxNodesPerRack, considerLoad, 
results,
+                avoidStaleNodes, type);
+            if (newExcludedNodes >= 0) {
+              numOfReplicas--;
+              if (firstChosen == null) {
+                firstChosen = storages[i];
+              }
+              numOfAvailableNodes -= newExcludedNodes;
+              int num = entry.getValue();
+              if (num == 1) {
+                iter.remove();
+              } else {
+                entry.setValue(num - 1);
+              }
+              search = false;
+              break;
             }
-            numOfAvailableNodes -= newExcludedNodes;
-            break;
           }
         }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e08701ec/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
index b3ff6b9..60e192b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
@@ -17,12 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -70,22 +65,33 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
   protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
-      StorageType storageType, boolean fallbackToLocalRack
-      ) throws NotEnoughReplicasException {
+      EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
+      throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-          blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+          blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
 
     // otherwise try local machine first
     if (localMachine instanceof DatanodeDescriptor) {
       DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine;
       if (excludedNodes.add(localMachine)) { // was not in the excluded list
-        for(DatanodeStorageInfo localStorage : DFSUtil.shuffle(
-            localDataNode.getStorageInfos())) {
-          if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
-              maxNodesPerRack, false, results, avoidStaleNodes, storageType) 
>= 0) {
-            return localStorage;
+        for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
+            .entrySet().iterator(); iter.hasNext(); ) {
+          Map.Entry<StorageType, Integer> entry = iter.next();
+          for (DatanodeStorageInfo localStorage : DFSUtil.shuffle(
+              localDataNode.getStorageInfos())) {
+            StorageType type = entry.getKey();
+            if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
+                maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) {
+              int num = entry.getValue();
+              if (num == 1) {
+                iter.remove();
+              } else {
+                entry.setValue(num - 1);
+              }
+              return localStorage;
+            }
           }
         }
       }
@@ -94,7 +100,7 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
     // try a node on local node group
     DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup(
         (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     if (chosenStorage != null) {
       return chosenStorage;
     }
@@ -104,7 +110,7 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
     }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
   }
 
   /** @return the node of the second replica */
@@ -124,18 +130,19 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
   protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
-      StorageType storageType) throws NotEnoughReplicasException {
+      EnumMap<StorageType, Integer> storageTypes) throws
+      NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes, storageType);
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     }
 
     // choose one from the local rack, but off-nodegroup
     try {
       final String scope = 
NetworkTopology.getFirstHalf(localMachine.getNetworkLocation());
       return chooseRandom(scope, excludedNodes, blocksize, maxNodesPerRack,
-          results, avoidStaleNodes, storageType);
+          results, avoidStaleNodes, storageTypes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       final DatanodeDescriptor newLocal = secondNode(localMachine, results);
@@ -143,16 +150,17 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
         try {
           return chooseRandom(
               clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
-              blocksize, maxNodesPerRack, results, avoidStaleNodes, 
storageType);
+              blocksize, maxNodesPerRack, results, avoidStaleNodes,
+              storageTypes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
           return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-              maxNodesPerRack, results, avoidStaleNodes, storageType);
+              maxNodesPerRack, results, avoidStaleNodes, storageTypes);
         }
       } else {
         //otherwise randomly choose one from the network
         return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes, storageType);
+            maxNodesPerRack, results, avoidStaleNodes, storageTypes);
       }
     }
   }
@@ -161,8 +169,8 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
   protected void chooseRemoteRack(int numOfReplicas,
       DatanodeDescriptor localMachine, Set<Node> excludedNodes,
       long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> 
results,
-      boolean avoidStaleNodes, StorageType storageType)
-          throws NotEnoughReplicasException {
+      boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes)
+      throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
 
     final String rackLocation = NetworkTopology.getFirstHalf(
@@ -170,12 +178,12 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
     try {
       // randomly choose from remote racks
       chooseRandom(numOfReplicas, "~" + rackLocation, excludedNodes, blocksize,
-          maxReplicasPerRack, results, avoidStaleNodes, storageType);
+          maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
     } catch (NotEnoughReplicasException e) {
       // fall back to the local rack
       chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
           rackLocation, excludedNodes, blocksize,
-          maxReplicasPerRack, results, avoidStaleNodes, storageType);
+          maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
     }
   }
 
@@ -189,11 +197,12 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
       NetworkTopologyWithNodeGroup clusterMap, Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
-      StorageType storageType) throws NotEnoughReplicasException {
+      EnumMap<StorageType, Integer> storageTypes) throws
+      NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes, storageType);
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     }
 
     // choose one from the local node group
@@ -201,7 +210,7 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
       return chooseRandom(
           clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
           excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
-          storageType);
+          storageTypes);
     } catch (NotEnoughReplicasException e1) {
       final DatanodeDescriptor newLocal = secondNode(localMachine, results);
       if (newLocal != null) {
@@ -209,16 +218,16 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
           return chooseRandom(
               clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
               excludedNodes, blocksize, maxNodesPerRack, results,
-              avoidStaleNodes, storageType);
+              avoidStaleNodes, storageTypes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
           return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-              maxNodesPerRack, results, avoidStaleNodes, storageType);
+              maxNodesPerRack, results, avoidStaleNodes, storageTypes);
         }
       } else {
         //otherwise randomly choose one from the network
         return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes, storageType);
+            maxNodesPerRack, results, avoidStaleNodes, storageTypes);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e08701ec/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index a57dd2f..777127e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -951,9 +951,14 @@ public class DFSTestUtil {
   public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n) {
     return createDatanodeStorageInfos(n, null, null);
   }
-    
+
   public static DatanodeStorageInfo[] createDatanodeStorageInfos(
       int n, String[] racks, String[] hostnames) {
+    return createDatanodeStorageInfos(n, racks, hostnames, null);
+  }
+
+  public static DatanodeStorageInfo[] createDatanodeStorageInfos(
+      int n, String[] racks, String[] hostnames, StorageType[] types) {
     DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n];
     for(int i = storages.length; i > 0; ) {
       final String storageID = "s" + i;
@@ -961,16 +966,30 @@ public class DFSTestUtil {
       i--;
       final String rack = (racks!=null && i < racks.length)? racks[i]: 
"defaultRack";
       final String hostname = (hostnames!=null && i < hostnames.length)? 
hostnames[i]: "host";
-      storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname);
+      final StorageType type = (types != null && i < types.length) ? types[i]
+          : StorageType.DEFAULT;
+      storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname,
+          type);
     }
     return storages;
   }
+
   public static DatanodeStorageInfo createDatanodeStorageInfo(
       String storageID, String ip, String rack, String hostname) {
-    final DatanodeStorage storage = new DatanodeStorage(storageID);
-    final DatanodeDescriptor dn = 
BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage, hostname);
+    return createDatanodeStorageInfo(storageID, ip, rack, hostname,
+        StorageType.DEFAULT);
+  }
+
+  public static DatanodeStorageInfo createDatanodeStorageInfo(
+      String storageID, String ip, String rack, String hostname,
+      StorageType type) {
+    final DatanodeStorage storage = new DatanodeStorage(storageID,
+        DatanodeStorage.State.NORMAL, type);
+    final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(
+        ip, rack, storage, hostname);
     return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage);
   }
+
   public static DatanodeDescriptor[] toDatanodeDescriptor(
       DatanodeStorageInfo[] storages) {
     DatanodeDescriptor[] datanodes = new DatanodeDescriptor[storages.length];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e08701ec/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
index 9fdc8cf..da7306c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java
@@ -19,23 +19,26 @@ package org.apache.hadoop.hdfs;
 
 import static org.apache.hadoop.hdfs.BlockStoragePolicy.ID_UNSPECIFIED;
 
+import java.io.File;
 import java.io.FileNotFoundException;
-import java.util.Arrays;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.server.blockmanagement.*;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.PathUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -838,9 +841,7 @@ public class TestBlockStoragePolicy {
       checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
       checkDirectoryListing(barList, WARM, HOT);
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      cluster.shutdown();
     }
   }
 
@@ -920,9 +921,7 @@ public class TestBlockStoragePolicy {
       checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(),
           HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD, HOT);
     } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-      }
+      cluster.shutdown();
     }
   }
 
@@ -937,9 +936,7 @@ public class TestBlockStoragePolicy {
   private void checkLocatedBlocks(HdfsLocatedFileStatus status, int blockNum,
                                   int replicaNum, StorageType... types) {
     List<StorageType> typeList = Lists.newArrayList();
-    for (StorageType type : types) {
-      typeList.add(type);
-    }
+    Collections.addAll(typeList, types);
     LocatedBlocks lbs = status.getBlockLocations();
     Assert.assertEquals(blockNum, lbs.getLocatedBlocks().size());
     for (LocatedBlock lb : lbs.getLocatedBlocks()) {
@@ -1029,4 +1026,50 @@ public class TestBlockStoragePolicy {
         new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
             StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE});
   }
+
+  @Test
+  public void testChooseTargetWithTopology() throws Exception {
+    BlockStoragePolicy policy1 = new BlockStoragePolicy((byte) 9, "TEST1",
+        new StorageType[]{StorageType.SSD, StorageType.DISK,
+            StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{});
+    BlockStoragePolicy policy2 = new BlockStoragePolicy((byte) 11, "TEST2",
+        new StorageType[]{StorageType.DISK, StorageType.SSD,
+            StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{});
+
+    final String[] racks = {"/d1/r1", "/d1/r2", "/d1/r2"};
+    final String[] hosts = {"host1", "host2", "host3"};
+    final StorageType[] types = {StorageType.DISK, StorageType.SSD,
+        StorageType.ARCHIVE};
+
+    final DatanodeStorageInfo[] storages = DFSTestUtil
+        .createDatanodeStorageInfos(3, racks, hosts, types);
+    final DatanodeDescriptor[] dataNodes = DFSTestUtil
+        .toDatanodeDescriptor(storages);
+
+    FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        new File(baseDir, "name").getPath());
+    DFSTestUtil.formatNameNode(conf);
+    NameNode namenode = new NameNode(conf);
+
+    final BlockManager bm = namenode.getNamesystem().getBlockManager();
+    BlockPlacementPolicy replicator = bm.getBlockPlacementPolicy();
+    NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology();
+    for (DatanodeDescriptor datanode : dataNodes) {
+      cluster.add(datanode);
+    }
+
+    DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3,
+        dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
+        new HashSet<Node>(), 0, policy1);
+    System.out.println(Arrays.asList(targets));
+    Assert.assertEquals(3, targets.length);
+    targets = replicator.chooseTarget("/foo", 3,
+        dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
+        new HashSet<Node>(), 0, policy2);
+    System.out.println(Arrays.asList(targets));
+    Assert.assertEquals(3, targets.length);
+  }
 }

Reply via email to