http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 78c1149..d2eb85b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -18,17 +18,22 @@
 package org.apache.hadoop.hdfs.server.balancer;
 
 import java.io.Closeable;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.URI;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +50,8 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * The class provides utilities for accessing a NameNode.
  */
@@ -53,6 +60,41 @@ public class NameNodeConnector implements Closeable {
   private static final Log LOG = LogFactory.getLog(NameNodeConnector.class);
 
   private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
+  private static boolean write2IdFile = true;
+  
+  /** Create {@link NameNodeConnector} for the given namenodes. */
+  public static List<NameNodeConnector> newNameNodeConnectors(
+      Collection<URI> namenodes, String name, Path idPath, Configuration conf)
+      throws IOException {
+    final List<NameNodeConnector> connectors = new 
ArrayList<NameNodeConnector>(
+        namenodes.size());
+    for (URI uri : namenodes) {
+      NameNodeConnector nnc = new NameNodeConnector(name, uri, idPath,
+          null, conf);
+      nnc.getKeyManager().startBlockKeyUpdater();
+      connectors.add(nnc);
+    }
+    return connectors;
+  }
+
+  public static List<NameNodeConnector> newNameNodeConnectors(
+      Map<URI, List<Path>> namenodes, String name, Path idPath,
+      Configuration conf) throws IOException {
+    final List<NameNodeConnector> connectors = new 
ArrayList<NameNodeConnector>(
+        namenodes.size());
+    for (Map.Entry<URI, List<Path>> entry : namenodes.entrySet()) {
+      NameNodeConnector nnc = new NameNodeConnector(name, entry.getKey(),
+          idPath, entry.getValue(), conf);
+      nnc.getKeyManager().startBlockKeyUpdater();
+      connectors.add(nnc);
+    }
+    return connectors;
+  }
+
+  @VisibleForTesting
+  public static void setWrite2IdFile(boolean write2IdFile) {
+    NameNodeConnector.write2IdFile = write2IdFile;
+  }
 
   private final URI nameNodeUri;
   private final String blockpoolID;
@@ -62,17 +104,21 @@ public class NameNodeConnector implements Closeable {
   private final KeyManager keyManager;
   final AtomicBoolean fallbackToSimpleAuth = new AtomicBoolean(false);
 
-  private final FileSystem fs;
+  private final DistributedFileSystem fs;
   private final Path idPath;
   private final OutputStream out;
+  private final List<Path> targetPaths;
 
   private int notChangedIterations = 0;
 
   public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
-      Configuration conf) throws IOException {
+                           List<Path> targetPaths, Configuration conf)
+      throws IOException {
     this.nameNodeUri = nameNodeUri;
     this.idPath = idPath;
-    
+    this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays
+        .asList(new Path("/")) : targetPaths;
+
     this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
         NamenodeProtocol.class).getProxy();
     this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
@@ -85,13 +131,18 @@ public class NameNodeConnector implements Closeable {
     final FsServerDefaults defaults = fs.getServerDefaults(new Path("/"));
     this.keyManager = new KeyManager(blockpoolID, namenode,
         defaults.getEncryptDataTransfer(), conf);
-    // Exit if there is another one running.
-    out = checkAndMarkRunning(); 
+    // if it is for test, we do not create the id file
+    out = checkAndMarkRunning();
     if (out == null) {
+      // Exit if there is another one running.
       throw new IOException("Another " + name + " is running.");
     }
   }
 
+  public DistributedFileSystem getDistributedFileSystem() {
+    return fs;
+  }
+
   /** @return the block pool ID */
   public String getBlockpoolID() {
     return blockpoolID;
@@ -114,6 +165,11 @@ public class NameNodeConnector implements Closeable {
     return keyManager;
   }
 
+  /** @return the list of paths to scan/migrate */
+  public List<Path> getTargetPaths() {
+    return targetPaths;
+  }
+
   /** Should the instance continue running? */
   public boolean shouldContinue(long dispatchBlockMoveBytes) {
     if (dispatchBlockMoveBytes > 0) {
@@ -147,9 +203,11 @@ public class NameNodeConnector implements Closeable {
    */
   private OutputStream checkAndMarkRunning() throws IOException {
     try {
-      final DataOutputStream out = fs.create(idPath);
-      out.writeBytes(InetAddress.getLocalHost().getHostName());
-      out.flush();
+      final FSDataOutputStream out = fs.create(idPath);
+      if (write2IdFile) {
+        out.writeBytes(InetAddress.getLocalHost().getHostName());
+        out.hflush();
+      }
       return out;
     } catch(RemoteException e) {
       
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
index e215c17..d095a1b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
@@ -60,6 +60,11 @@ public interface BlockCollection {
    */
   public short getBlockReplication();
 
+  /** 
+   * @return the storage policy ID.
+   */
+  public byte getStoragePolicyID();
+
   /**
    * Get the name of the collection.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 5d23c1f..7c18444 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -42,6 +42,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;
@@ -254,6 +255,7 @@ public class BlockManager {
 
   /** for block replicas placement */
   private BlockPlacementPolicy blockplacement;
+  private final BlockStoragePolicy.Suite storagePolicySuite;
 
   /** Check whether name system is running before terminating */
   private boolean checkNSRunning = true;
@@ -276,6 +278,7 @@ public class BlockManager {
     blockplacement = BlockPlacementPolicy.getInstance(
         conf, stats, datanodeManager.getNetworkTopology(), 
         datanodeManager.getHost2DatanodeMap());
+    storagePolicySuite = BlockStoragePolicy.readBlockStorageSuite(conf);
     pendingReplications = new PendingReplicationBlocks(conf.getInt(
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 
1000L);
@@ -394,7 +397,11 @@ public class BlockManager {
           lifetimeMin*60*1000L, 0, null, encryptionAlgorithm);
     }
   }
-  
+
+  public BlockStoragePolicy getStoragePolicy(final String policyName) {
+    return storagePolicySuite.getPolicy(policyName);
+  }
+
   public void setBlockPoolId(String blockPoolId) {
     if (isBlockTokenEnabled()) {
       blockTokenSecretManager.setBlockPoolId(blockPoolId);
@@ -445,7 +452,7 @@ public class BlockManager {
     return datanodeManager;
   }
 
-  /** @return the BlockPlacementPolicy */
+  @VisibleForTesting
   public BlockPlacementPolicy getBlockPlacementPolicy() {
     return blockplacement;
   }
@@ -1366,7 +1373,7 @@ public class BlockManager {
       // choose replication targets: NOT HOLDING THE GLOBAL LOCK
       // It is costly to extract the filename for which chooseTargets is 
called,
       // so for now we pass in the block collection itself.
-      rw.chooseTargets(blockplacement, excludedNodes);
+      rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes);
     }
 
     namesystem.writeLock();
@@ -1470,24 +1477,48 @@ public class BlockManager {
     return scheduledWork;
   }
 
+  /** Choose target for WebHDFS redirection. */
+  public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src,
+      DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) {
+    return blockplacement.chooseTarget(src, 1, clientnode,
+        Collections.<DatanodeStorageInfo>emptyList(), false, excludes,
+        blocksize, storagePolicySuite.getDefaultPolicy());
+  }
+
+  /** Choose target for getting additional datanodes for an existing pipeline. 
*/
+  public DatanodeStorageInfo[] chooseTarget4AdditionalDatanode(String src,
+      int numAdditionalNodes,
+      DatanodeDescriptor clientnode,
+      List<DatanodeStorageInfo> chosen,
+      Set<Node> excludes,
+      long blocksize,
+      byte storagePolicyID) {
+    
+    final BlockStoragePolicy storagePolicy = 
storagePolicySuite.getPolicy(storagePolicyID);
+    return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode,
+        chosen, true, excludes, blocksize, storagePolicy);
+  }
+
   /**
-   * Choose target datanodes according to the replication policy.
+   * Choose target datanodes for creating a new block.
    * 
    * @throws IOException
    *           if the number of targets < minimum replication.
    * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
-   *      List, boolean, Set, long, StorageType)
+   *      Set, long, List, BlockStoragePolicy)
    */
-  public DatanodeStorageInfo[] chooseTarget(final String src,
+  public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
       final int numOfReplicas, final DatanodeDescriptor client,
       final Set<Node> excludedNodes,
-      final long blocksize, List<String> favoredNodes) throws IOException {
+      final long blocksize,
+      final List<String> favoredNodes,
+      final byte storagePolicyID) throws IOException {
     List<DatanodeDescriptor> favoredDatanodeDescriptors = 
         getDatanodeDescriptors(favoredNodes);
+    final BlockStoragePolicy storagePolicy = 
storagePolicySuite.getPolicy(storagePolicyID);
     final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
         numOfReplicas, client, excludedNodes, blocksize, 
-        // TODO: get storage type from file
-        favoredDatanodeDescriptors, StorageType.DEFAULT);
+        favoredDatanodeDescriptors, storagePolicy);
     if (targets.length < minReplication) {
       throw new IOException("File " + src + " could only be replicated to "
           + targets.length + " nodes instead of minReplication (="
@@ -2719,6 +2750,10 @@ public class BlockManager {
     assert namesystem.hasWriteLock();
     // first form a rack to datanodes map and
     BlockCollection bc = getBlockCollection(b);
+    final BlockStoragePolicy storagePolicy = 
storagePolicySuite.getPolicy(bc.getStoragePolicyID());
+    final List<StorageType> excessTypes = storagePolicy.chooseExcess(
+        replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
+
 
     final Map<String, List<DatanodeStorageInfo>> rackMap
         = new HashMap<String, List<DatanodeStorageInfo>>();
@@ -2739,16 +2774,13 @@ public class BlockManager {
     final DatanodeStorageInfo addedNodeStorage
         = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
     while (nonExcess.size() - replication > 0) {
-      // check if we can delete delNodeHint
       final DatanodeStorageInfo cur;
-      if (firstOne && delNodeHintStorage != null
-          && (moreThanOne.contains(delNodeHintStorage)
-              || (addedNodeStorage != null
-                  && !moreThanOne.contains(addedNodeStorage)))) {
+      if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
+          moreThanOne, excessTypes)) {
         cur = delNodeHintStorage;
       } else { // regular excessive replica removal
         cur = replicator.chooseReplicaToDelete(bc, b, replication,
-                       moreThanOne, exactlyOne);
+            moreThanOne, exactlyOne, excessTypes);
       }
       firstOne = false;
 
@@ -2774,6 +2806,27 @@ public class BlockManager {
     }
   }
 
+  /** Check if we can use delHint */
+  static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint,
+      DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks,
+      List<StorageType> excessTypes) {
+    if (!isFirst) {
+      return false; // only consider delHint for the first case
+    } else if (delHint == null) {
+      return false; // no delHint
+    } else if (!excessTypes.contains(delHint.getStorageType())) {
+      return false; // delHint storage type is not an excess type
+    } else {
+      // check if removing delHint reduces the number of racks
+      if (moreThan1Racks.contains(delHint)) {
+        return true; // delHint and some other nodes are under the same rack 
+      } else if (added != null && !moreThan1Racks.contains(added)) {
+        return true; // the added node adds a new rack
+      }
+      return false; // removing delHint reduces the number of racks;
+    }
+  }
+
   private void addToExcessReplicate(DatanodeInfo dn, Block block) {
     assert namesystem.hasWriteLock();
     LightWeightLinkedSet<Block> excessBlocks = 
excessReplicateMap.get(dn.getDatanodeUuid());
@@ -2880,7 +2933,7 @@ public class BlockManager {
     // Decrement number of blocks scheduled to this datanode.
     // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with 
     // RECEIVED_BLOCK), we currently also decrease the approximate number. 
-    node.decrementBlocksScheduled();
+    node.decrementBlocksScheduled(storageInfo.getStorageType());
 
     // get the deletion hint node
     DatanodeDescriptor delHintNode = null;
@@ -3549,10 +3602,12 @@ public class BlockManager {
     }
     
     private void chooseTargets(BlockPlacementPolicy blockplacement,
+        BlockStoragePolicy.Suite storagePolicySuite,
         Set<Node> excludedNodes) {
       targets = blockplacement.chooseTarget(bc.getName(),
           additionalReplRequired, srcNode, liveReplicaStorages, false,
-          excludedNodes, block.getNumBytes(), StorageType.DEFAULT);
+          excludedNodes, block.getNumBytes(),
+          storagePolicySuite.getPolicy(bc.getStoragePolicyID()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
index 0a4dd81..af58127 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -75,7 +76,7 @@ public abstract class BlockPlacementPolicy {
                                              boolean returnChosenNodes,
                                              Set<Node> excludedNodes,
                                              long blocksize,
-                                             StorageType storageType);
+                                             BlockStoragePolicy storagePolicy);
   
   /**
    * Same as {@link #chooseTarget(String, int, Node, Set, long, List, 
StorageType)}
@@ -89,14 +90,14 @@ public abstract class BlockPlacementPolicy {
       Set<Node> excludedNodes,
       long blocksize,
       List<DatanodeDescriptor> favoredNodes,
-      StorageType storageType) {
+      BlockStoragePolicy storagePolicy) {
     // This class does not provide the functionality of placing
     // a block in favored datanodes. The implementations of this class
     // are expected to provide this functionality
 
     return chooseTarget(src, numOfReplicas, writer, 
         new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
-        excludedNodes, blocksize, storageType);
+        excludedNodes, blocksize, storagePolicy);
   }
 
   /**
@@ -118,18 +119,21 @@ public abstract class BlockPlacementPolicy {
    * @param srcBC block collection of file to which block-to-be-deleted belongs
    * @param block The block to be deleted
    * @param replicationFactor The required number of replicas for this block
-   * @param existingReplicas The replica locations of this block that are 
present
-                  on at least two unique racks. 
-   * @param moreExistingReplicas Replica locations of this block that are not
-                   listed in the previous parameter.
+   * @param moreThanOne The replica locations of this block that are present
+   *                    on more than one unique racks.
+   * @param exactlyOne Replica locations of this block that  are present
+   *                    on exactly one unique racks.
+   * @param excessTypes The excess {@link StorageType}s according to the
+   *                    {@link BlockStoragePolicy}.
    * @return the replica that is the best candidate for deletion
    */
   abstract public DatanodeStorageInfo chooseReplicaToDelete(
       BlockCollection srcBC,
       Block block, 
       short replicationFactor,
-      Collection<DatanodeStorageInfo> existingReplicas,
-      Collection<DatanodeStorageInfo> moreExistingReplicas);
+      Collection<DatanodeStorageInfo> moreThanOne,
+      Collection<DatanodeStorageInfo> exactlyOne,
+      List<StorageType> excessTypes);
 
   /**
    * Used to setup a BlockPlacementPolicy object. This should be defined by 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index f77d4ab..a0e6701 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -19,15 +19,11 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import static org.apache.hadoop.util.Time.now;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
+import java.util.*;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.StorageType;
@@ -80,12 +76,6 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
    */
   protected int tolerateHeartbeatMultiplier;
 
-  protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats 
stats,
-                           NetworkTopology clusterMap, 
-                           Host2NodesMap host2datanodeMap) {
-    initialize(conf, stats, clusterMap, host2datanodeMap);
-  }
-
   protected BlockPlacementPolicyDefault() {
   }
     
@@ -117,9 +107,9 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
                                     boolean returnChosenNodes,
                                     Set<Node> excludedNodes,
                                     long blocksize,
-                                    StorageType storageType) {
+                                    final BlockStoragePolicy storagePolicy) {
     return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
-        excludedNodes, blocksize, storageType);
+        excludedNodes, blocksize, storagePolicy);
   }
 
   @Override
@@ -129,17 +119,21 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       Set<Node> excludedNodes,
       long blocksize,
       List<DatanodeDescriptor> favoredNodes,
-      StorageType storageType) {
+      BlockStoragePolicy storagePolicy) {
     try {
       if (favoredNodes == null || favoredNodes.size() == 0) {
         // Favored nodes not specified, fall back to regular block placement.
         return chooseTarget(src, numOfReplicas, writer,
             new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, 
-            excludedNodes, blocksize, storageType);
+            excludedNodes, blocksize, storagePolicy);
       }
 
       Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
           new HashSet<Node>() : new HashSet<Node>(excludedNodes);
+      final List<StorageType> requiredStorageTypes = storagePolicy
+          .chooseStorageTypes((short)numOfReplicas);
+      final EnumMap<StorageType, Integer> storageTypes =
+          getRequiredStorageTypes(requiredStorageTypes);
 
       // Choose favored nodes
       List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
@@ -152,7 +146,7 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
         final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
             favoriteAndExcludedNodes, blocksize, 
             getMaxNodesPerRack(results.size(), numOfReplicas)[1],
-            results, avoidStaleNodes, storageType, false);
+            results, avoidStaleNodes, storageTypes, false);
         if (target == null) {
           LOG.warn("Could not find a target for file " + src
               + " with favored node " + favoredNode); 
@@ -166,7 +160,7 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
         numOfReplicas -= results.size();
         DatanodeStorageInfo[] remainingTargets = 
             chooseTarget(src, numOfReplicas, writer, results,
-                false, favoriteAndExcludedNodes, blocksize, storageType);
+                false, favoriteAndExcludedNodes, blocksize, storagePolicy);
         for (int i = 0; i < remainingTargets.length; i++) {
           results.add(remainingTargets[i]);
         }
@@ -174,10 +168,14 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       return getPipeline(writer,
           results.toArray(new DatanodeStorageInfo[results.size()]));
     } catch (NotEnoughReplicasException nr) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Failed to choose with favored nodes (=" + favoredNodes
+            + "), disregard favored nodes hint and retry.", nr);
+      }
       // Fall back to regular block placement disregarding favored nodes hint
       return chooseTarget(src, numOfReplicas, writer, 
           new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, 
-          excludedNodes, blocksize, storageType);
+          excludedNodes, blocksize, storagePolicy);
     }
   }
 
@@ -188,7 +186,7 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
                                     boolean returnChosenNodes,
                                     Set<Node> excludedNodes,
                                     long blocksize,
-                                    StorageType storageType) {
+                                    final BlockStoragePolicy storagePolicy) {
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
       return DatanodeStorageInfo.EMPTY_ARRAY;
     }
@@ -213,8 +211,9 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       
     boolean avoidStaleNodes = (stats != null
         && stats.isAvoidingStaleDataNodesForWrite());
-    Node localNode = chooseTarget(numOfReplicas, writer,
-        excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, 
storageType);
+    final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
+        blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy,
+        EnumSet.noneOf(StorageType.class), results.isEmpty());
     if (!returnChosenNodes) {  
       results.removeAll(chosenStorage);
     }
@@ -234,7 +233,22 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
     int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
     return new int[] {numOfReplicas, maxNodesPerRack};
   }
-    
+
+  private EnumMap<StorageType, Integer> getRequiredStorageTypes(
+      List<StorageType> types) {
+    EnumMap<StorageType, Integer> map = new EnumMap<StorageType,
+        Integer>(StorageType.class);
+    for (StorageType type : types) {
+      if (!map.containsKey(type)) {
+        map.put(type, 1);
+      } else {
+        int num = map.get(type);
+        map.put(type, num + 1);
+      }
+    }
+    return map;
+  }
+
   /**
    * choose <i>numOfReplicas</i> from all data nodes
    * @param numOfReplicas additional number of replicas wanted
@@ -247,31 +261,49 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
    * @return local node of writer (not chosen node)
    */
   private Node chooseTarget(int numOfReplicas,
-                                          Node writer,
-                                          Set<Node> excludedNodes,
-                                          long blocksize,
-                                          int maxNodesPerRack,
-                                          List<DatanodeStorageInfo> results,
-                                          final boolean avoidStaleNodes,
-                                          StorageType storageType) {
+                            Node writer,
+                            final Set<Node> excludedNodes,
+                            final long blocksize,
+                            final int maxNodesPerRack,
+                            final List<DatanodeStorageInfo> results,
+                            final boolean avoidStaleNodes,
+                            final BlockStoragePolicy storagePolicy,
+                            final EnumSet<StorageType> unavailableStorages,
+                            final boolean newBlock) {
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
       return writer;
     }
-    int totalReplicasExpected = numOfReplicas + results.size();
-      
-    int numOfResults = results.size();
-    boolean newBlock = (numOfResults==0);
+    final int numOfResults = results.size();
+    final int totalReplicasExpected = numOfReplicas + numOfResults;
     if ((writer == null || !(writer instanceof DatanodeDescriptor)) && 
!newBlock) {
       writer = results.get(0).getDatanodeDescriptor();
     }
 
     // Keep a copy of original excludedNodes
-    final Set<Node> oldExcludedNodes = avoidStaleNodes ? 
-        new HashSet<Node>(excludedNodes) : null;
+    final Set<Node> oldExcludedNodes = new HashSet<Node>(excludedNodes);
+
+    // choose storage types; use fallbacks for unavailable storages
+    final List<StorageType> requiredStorageTypes = storagePolicy
+        .chooseStorageTypes((short) totalReplicasExpected,
+            DatanodeStorageInfo.toStorageTypes(results),
+            unavailableStorages, newBlock);
+    final EnumMap<StorageType, Integer> storageTypes =
+        getRequiredStorageTypes(requiredStorageTypes);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("storageTypes=" + storageTypes);
+    }
+
     try {
+      if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
+        throw new NotEnoughReplicasException(
+            "All required storage types are unavailable: "
+            + " unavailableStorages=" + unavailableStorages
+            + ", storagePolicy=" + storagePolicy);
+      }
+
       if (numOfResults == 0) {
         writer = chooseLocalStorage(writer, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes, storageType, true)
+            maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
                 .getDatanodeDescriptor();
         if (--numOfReplicas == 0) {
           return writer;
@@ -280,7 +312,7 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
       if (numOfResults <= 1) {
         chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
-            results, avoidStaleNodes, storageType);
+            results, avoidStaleNodes, storageTypes);
         if (--numOfReplicas == 0) {
           return writer;
         }
@@ -289,24 +321,28 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
         final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
         if (clusterMap.isOnSameRack(dn0, dn1)) {
           chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
-              results, avoidStaleNodes, storageType);
+              results, avoidStaleNodes, storageTypes);
         } else if (newBlock){
           chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
-              results, avoidStaleNodes, storageType);
+              results, avoidStaleNodes, storageTypes);
         } else {
           chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
-              results, avoidStaleNodes, storageType);
+              results, avoidStaleNodes, storageTypes);
         }
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
       chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes, storageType);
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     } catch (NotEnoughReplicasException e) {
       final String message = "Failed to place enough replicas, still in need 
of "
           + (totalReplicasExpected - results.size()) + " to reach "
-          + totalReplicasExpected + ".";
+          + totalReplicasExpected
+          + " (unavailableStorages=" + unavailableStorages
+          + ", storagePolicy=" + storagePolicy
+          + ", newBlock=" + newBlock + ")";
+
       if (LOG.isTraceEnabled()) {
         LOG.trace(message, e);
       } else {
@@ -327,7 +363,28 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
         // if the NotEnoughReplicasException was thrown in chooseRandom().
         numOfReplicas = totalReplicasExpected - results.size();
         return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
-            maxNodesPerRack, results, false, storageType);
+            maxNodesPerRack, results, false, storagePolicy, 
unavailableStorages,
+            newBlock);
+      }
+
+      boolean retry = false;
+      // simply add all the remaining types into unavailableStorages and give
+      // another try. No best effort is guaranteed here.
+      for (StorageType type : storageTypes.keySet()) {
+        if (!unavailableStorages.contains(type)) {
+          unavailableStorages.add(type);
+          retry = true;
+        }
+      }
+      if (retry) {
+        for (DatanodeStorageInfo resultStorage : results) {
+          addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
+              oldExcludedNodes);
+        }
+        numOfReplicas = totalReplicasExpected - results.size();
+        return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
+            maxNodesPerRack, results, false, storagePolicy, 
unavailableStorages,
+            newBlock);
       }
     }
     return writer;
@@ -340,28 +397,35 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
    * @return the chosen storage
    */
   protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
-                                             Set<Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeStorageInfo> results,
-                                             boolean avoidStaleNodes,
-                                             StorageType storageType,
-                                             boolean fallbackToLocalRack)
+      Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+      EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
       throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes, storageType);
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     }
     if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
       DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
       // otherwise try local machine first
       if (excludedNodes.add(localMachine)) { // was not in the excluded list
-        for(DatanodeStorageInfo localStorage : DFSUtil.shuffle(
-            localDatanode.getStorageInfos())) {
-          if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
-              maxNodesPerRack, false, results, avoidStaleNodes, storageType) 
>= 0) {
-            return localStorage;
+        for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
+            .entrySet().iterator(); iter.hasNext(); ) {
+          Map.Entry<StorageType, Integer> entry = iter.next();
+          for (DatanodeStorageInfo localStorage : DFSUtil.shuffle(
+              localDatanode.getStorageInfos())) {
+            StorageType type = entry.getKey();
+            if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
+                maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) {
+              int num = entry.getValue();
+              if (num == 1) {
+                iter.remove();
+              } else {
+                entry.setValue(num - 1);
+              }
+              return localStorage;
+            }
           }
         }
       } 
@@ -372,7 +436,7 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
     }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, blocksize,
-        maxNodesPerRack, results, avoidStaleNodes, storageType);
+        maxNodesPerRack, results, avoidStaleNodes, storageTypes);
   }
   
   /**
@@ -395,50 +459,71 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
    * @return the chosen node
    */
   protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
-                                             Set<Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeStorageInfo> results,
-                                             boolean avoidStaleNodes,
-                                             StorageType storageType)
+                                                Set<Node> excludedNodes,
+                                                long blocksize,
+                                                int maxNodesPerRack,
+                                                List<DatanodeStorageInfo> 
results,
+                                                boolean avoidStaleNodes,
+                                                EnumMap<StorageType, Integer> 
storageTypes)
       throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes, storageType);
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     }
+    final String localRack = localMachine.getNetworkLocation();
       
-    // choose one from the local rack
     try {
-      return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
-          blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
-    } catch (NotEnoughReplicasException e1) {
-      // find the second replica
-      DatanodeDescriptor newLocal=null;
+      // choose one from the local rack
+      return chooseRandom(localRack, excludedNodes,
+          blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+    } catch (NotEnoughReplicasException e) {
+      // find the next replica and retry with its rack
       for(DatanodeStorageInfo resultStorage : results) {
         DatanodeDescriptor nextNode = resultStorage.getDatanodeDescriptor();
         if (nextNode != localMachine) {
-          newLocal = nextNode;
-          break;
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Failed to choose from local rack (location = " + 
localRack
+                + "), retry with the rack of the next replica (location = "
+                + nextNode.getNetworkLocation() + ")", e);
+          }
+          return chooseFromNextRack(nextNode, excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes, storageTypes);
         }
       }
-      if (newLocal != null) {
-        try {
-          return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
-              blocksize, maxNodesPerRack, results, avoidStaleNodes, 
storageType);
-        } catch(NotEnoughReplicasException e2) {
-          //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-              maxNodesPerRack, results, avoidStaleNodes, storageType);
-        }
-      } else {
-        //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes, storageType);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Failed to choose from local rack (location = " + localRack
+            + "); the second replica is not found, retry choosing ramdomly", 
e);
       }
+      //the second replica is not found, randomly choose one from the network
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     }
   }
-    
+
+  private DatanodeStorageInfo chooseFromNextRack(Node next,
+      Set<Node> excludedNodes,
+      long blocksize,
+      int maxNodesPerRack,
+      List<DatanodeStorageInfo> results,
+      boolean avoidStaleNodes,
+      EnumMap<StorageType, Integer> storageTypes) throws 
NotEnoughReplicasException {
+    final String nextRack = next.getNetworkLocation();
+    try {
+      return chooseRandom(nextRack, excludedNodes, blocksize, maxNodesPerRack,
+          results, avoidStaleNodes, storageTypes);
+    } catch(NotEnoughReplicasException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Failed to choose from the next rack (location = " + nextRack
+            + "), retry choosing ramdomly", e);
+      }
+      //otherwise randomly choose one from the network
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+    }
+  }
+
   /** 
    * Choose <i>numOfReplicas</i> nodes from the racks 
    * that <i>localMachine</i> is NOT on.
@@ -453,18 +538,22 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
                                 int maxReplicasPerRack,
                                 List<DatanodeStorageInfo> results,
                                 boolean avoidStaleNodes,
-                                StorageType storageType)
+                                EnumMap<StorageType, Integer> storageTypes)
                                     throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
       chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
           excludedNodes, blocksize, maxReplicasPerRack, results,
-          avoidStaleNodes, storageType);
+          avoidStaleNodes, storageTypes);
     } catch (NotEnoughReplicasException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Failed to choose remote rack (location = ~"
+            + localMachine.getNetworkLocation() + "), fallback to local rack", 
e);
+      }
       chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
                    localMachine.getNetworkLocation(), excludedNodes, 
blocksize, 
-                   maxReplicasPerRack, results, avoidStaleNodes, storageType);
+                   maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
     }
   }
 
@@ -478,10 +567,10 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       int maxNodesPerRack,
       List<DatanodeStorageInfo> results,
       boolean avoidStaleNodes,
-      StorageType storageType)
+      EnumMap<StorageType, Integer> storageTypes)
           throws NotEnoughReplicasException {
     return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack,
-        results, avoidStaleNodes, storageType);
+        results, avoidStaleNodes, storageTypes);
   }
 
   /**
@@ -495,8 +584,8 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
                             int maxNodesPerRack,
                             List<DatanodeStorageInfo> results,
                             boolean avoidStaleNodes,
-                            StorageType storageType)
-                                throws NotEnoughReplicasException {
+                            EnumMap<StorageType, Integer> storageTypes)
+                            throws NotEnoughReplicasException {
       
     int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
         scope, excludedNodes);
@@ -512,24 +601,43 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       DatanodeDescriptor chosenNode = 
           (DatanodeDescriptor)clusterMap.chooseRandom(scope);
       if (excludedNodes.add(chosenNode)) { //was not in the excluded list
+        if (LOG.isDebugEnabled()) {
+          builder.append("\nNode 
").append(NodeBase.getPath(chosenNode)).append(" [");
+        }
         numOfAvailableNodes--;
 
         final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
             chosenNode.getStorageInfos());
-        int i;
-        for(i = 0; i < storages.length; i++) {
-          final int newExcludedNodes = addIfIsGoodTarget(storages[i],
-              excludedNodes, blocksize, maxNodesPerRack, considerLoad, results,
-              avoidStaleNodes, storageType);
-          if (newExcludedNodes >= 0) {
-            numOfReplicas--;
-            if (firstChosen == null) {
-              firstChosen = storages[i];
+        int i = 0;
+        boolean search = true;
+        for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
+            .entrySet().iterator(); search && iter.hasNext(); ) {
+          Map.Entry<StorageType, Integer> entry = iter.next();
+          for (i = 0; i < storages.length; i++) {
+            StorageType type = entry.getKey();
+            final int newExcludedNodes = addIfIsGoodTarget(storages[i],
+                excludedNodes, blocksize, maxNodesPerRack, considerLoad, 
results,
+                avoidStaleNodes, type);
+            if (newExcludedNodes >= 0) {
+              numOfReplicas--;
+              if (firstChosen == null) {
+                firstChosen = storages[i];
+              }
+              numOfAvailableNodes -= newExcludedNodes;
+              int num = entry.getValue();
+              if (num == 1) {
+                iter.remove();
+              } else {
+                entry.setValue(num - 1);
+              }
+              search = false;
+              break;
             }
-            numOfAvailableNodes -= newExcludedNodes;
-            break;
           }
         }
+        if (LOG.isDebugEnabled()) {
+          builder.append("\n]");
+        }
 
         // If no candidate storage was found on this DN then set badTarget.
         badTarget = (i == storages.length);
@@ -540,9 +648,11 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       String detail = enableDebugLogging;
       if (LOG.isDebugEnabled()) {
         if (badTarget && builder != null) {
-          detail = builder.append("]").toString();
+          detail = builder.toString();
           builder.setLength(0);
-        } else detail = "";
+        } else {
+          detail = "";
+        }
       }
       throw new NotEnoughReplicasException(detail);
     }
@@ -576,14 +686,10 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
 
   private static void logNodeIsNotChosen(DatanodeStorageInfo storage, String 
reason) {
     if (LOG.isDebugEnabled()) {
-      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       // build the error message for later use.
       debugLoggingBuilder.get()
-          .append(node).append(": ")
-          .append("Storage ").append(storage)
-          .append("at node ").append(NodeBase.getPath(node))
-          .append(" is not chosen because ")
-          .append(reason);
+          .append("\n  Storage ").append(storage)
+          .append(" is not chosen since ").append(reason).append(".");
     }
   }
 
@@ -608,11 +714,10 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
                                boolean considerLoad,
                                List<DatanodeStorageInfo> results,
                                boolean avoidStaleNodes,
-                               StorageType storageType) {
-    if (storage.getStorageType() != storageType) {
-      logNodeIsNotChosen(storage,
-          "storage types do not match, where the expected storage type is "
-              + storageType);
+                               StorageType requiredStorageType) {
+    if (storage.getStorageType() != requiredStorageType) {
+      logNodeIsNotChosen(storage, "storage types do not match,"
+          + " where the required storage type is " + requiredStorageType);
       return false;
     }
     if (storage.getState() == State.READ_ONLY_SHARED) {
@@ -634,9 +739,14 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
     }
     
     final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE;
-    final long scheduledSize = blockSize * node.getBlocksScheduled();
-    if (requiredSize > storage.getRemaining() - scheduledSize) {
-      logNodeIsNotChosen(storage, "the node does not have enough space ");
+    final long scheduledSize = blockSize * 
node.getBlocksScheduled(storage.getStorageType());
+    final long remaining = node.getRemaining(storage.getStorageType());
+    if (requiredSize > remaining - scheduledSize) {
+      logNodeIsNotChosen(storage, "the node does not have enough "
+          + storage.getStorageType() + " space"
+          + " (required=" + requiredSize
+          + ", scheduled=" + scheduledSize
+          + ", remaining=" + remaining + ")");
       return false;
     }
 
@@ -645,8 +755,8 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
       final int nodeLoad = node.getXceiverCount();
       if (nodeLoad > maxLoad) {
-        logNodeIsNotChosen(storage,
-            "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
+        logNodeIsNotChosen(storage, "the node is too busy (load: " + nodeLoad
+            + " > " + maxLoad + ") ");
         return false;
       }
     }
@@ -666,7 +776,7 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
     }
     return true;
   }
-    
+
   /**
    * Return a pipeline of nodes.
    * The pipeline is formed finding a shortest path that 
@@ -732,7 +842,8 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
   public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc,
       Block block, short replicationFactor,
       Collection<DatanodeStorageInfo> first,
-      Collection<DatanodeStorageInfo> second) {
+      Collection<DatanodeStorageInfo> second,
+      final List<StorageType> excessTypes) {
     long oldestHeartbeat =
       now() - heartbeatInterval * tolerateHeartbeatMultiplier;
     DatanodeStorageInfo oldestHeartbeatStorage = null;
@@ -742,6 +853,10 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
     // Pick the node with the oldest heartbeat or with the least free space,
     // if all hearbeats are within the tolerable heartbeat interval
     for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
+      if (!excessTypes.contains(storage.getStorageType())) {
+        continue;
+      }
+
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       long free = node.getRemaining();
       long lastHeartbeat = node.getLastUpdate();
@@ -755,8 +870,16 @@ public class BlockPlacementPolicyDefault extends 
BlockPlacementPolicy {
       }
     }
 
-    return oldestHeartbeatStorage != null? oldestHeartbeatStorage
-        : minSpaceStorage;
+    final DatanodeStorageInfo storage;
+    if (oldestHeartbeatStorage != null) {
+      storage = oldestHeartbeatStorage;
+    } else if (minSpaceStorage != null) {
+      storage = minSpaceStorage;
+    } else {
+      return null;
+    }
+    excessTypes.remove(storage.getStorageType());
+    return storage;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
index 2c8c37d..8626053 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
@@ -17,12 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -69,22 +64,33 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
   protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
-      StorageType storageType, boolean fallbackToLocalRack
-      ) throws NotEnoughReplicasException {
+      EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
+      throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-          blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+          blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
 
     // otherwise try local machine first
     if (localMachine instanceof DatanodeDescriptor) {
       DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine;
       if (excludedNodes.add(localMachine)) { // was not in the excluded list
-        for(DatanodeStorageInfo localStorage : DFSUtil.shuffle(
-            localDataNode.getStorageInfos())) {
-          if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
-              maxNodesPerRack, false, results, avoidStaleNodes, storageType) 
>= 0) {
-            return localStorage;
+        for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
+            .entrySet().iterator(); iter.hasNext(); ) {
+          Map.Entry<StorageType, Integer> entry = iter.next();
+          for (DatanodeStorageInfo localStorage : DFSUtil.shuffle(
+              localDataNode.getStorageInfos())) {
+            StorageType type = entry.getKey();
+            if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
+                maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) {
+              int num = entry.getValue();
+              if (num == 1) {
+                iter.remove();
+              } else {
+                entry.setValue(num - 1);
+              }
+              return localStorage;
+            }
           }
         }
       }
@@ -93,7 +99,7 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
     // try a node on local node group
     DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup(
         (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     if (chosenStorage != null) {
       return chosenStorage;
     }
@@ -103,7 +109,7 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
     }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
   }
 
   /** @return the node of the second replica */
@@ -123,18 +129,19 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
   protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
-      StorageType storageType) throws NotEnoughReplicasException {
+      EnumMap<StorageType, Integer> storageTypes) throws
+      NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes, storageType);
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     }
 
     // choose one from the local rack, but off-nodegroup
     try {
       final String scope = 
NetworkTopology.getFirstHalf(localMachine.getNetworkLocation());
       return chooseRandom(scope, excludedNodes, blocksize, maxNodesPerRack,
-          results, avoidStaleNodes, storageType);
+          results, avoidStaleNodes, storageTypes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       final DatanodeDescriptor newLocal = secondNode(localMachine, results);
@@ -142,16 +149,17 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
         try {
           return chooseRandom(
               clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
-              blocksize, maxNodesPerRack, results, avoidStaleNodes, 
storageType);
+              blocksize, maxNodesPerRack, results, avoidStaleNodes,
+              storageTypes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
           return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-              maxNodesPerRack, results, avoidStaleNodes, storageType);
+              maxNodesPerRack, results, avoidStaleNodes, storageTypes);
         }
       } else {
         //otherwise randomly choose one from the network
         return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes, storageType);
+            maxNodesPerRack, results, avoidStaleNodes, storageTypes);
       }
     }
   }
@@ -163,8 +171,8 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
   protected void chooseRemoteRack(int numOfReplicas,
       DatanodeDescriptor localMachine, Set<Node> excludedNodes,
       long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> 
results,
-      boolean avoidStaleNodes, StorageType storageType)
-          throws NotEnoughReplicasException {
+      boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes)
+      throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
 
     final String rackLocation = NetworkTopology.getFirstHalf(
@@ -172,12 +180,12 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
     try {
       // randomly choose from remote racks
       chooseRandom(numOfReplicas, "~" + rackLocation, excludedNodes, blocksize,
-          maxReplicasPerRack, results, avoidStaleNodes, storageType);
+          maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
     } catch (NotEnoughReplicasException e) {
       // fall back to the local rack
       chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
           rackLocation, excludedNodes, blocksize,
-          maxReplicasPerRack, results, avoidStaleNodes, storageType);
+          maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
     }
   }
 
@@ -191,11 +199,12 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
       NetworkTopologyWithNodeGroup clusterMap, Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
       List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
-      StorageType storageType) throws NotEnoughReplicasException {
+      EnumMap<StorageType, Integer> storageTypes) throws
+      NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-          maxNodesPerRack, results, avoidStaleNodes, storageType);
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
     }
 
     // choose one from the local node group
@@ -203,7 +212,7 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
       return chooseRandom(
           clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
           excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
-          storageType);
+          storageTypes);
     } catch (NotEnoughReplicasException e1) {
       final DatanodeDescriptor newLocal = secondNode(localMachine, results);
       if (newLocal != null) {
@@ -211,16 +220,16 @@ public class BlockPlacementPolicyWithNodeGroup extends 
BlockPlacementPolicyDefau
           return chooseRandom(
               clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
               excludedNodes, blocksize, maxNodesPerRack, results,
-              avoidStaleNodes, storageType);
+              avoidStaleNodes, storageTypes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
           return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-              maxNodesPerRack, results, avoidStaleNodes, storageType);
+              maxNodesPerRack, results, avoidStaleNodes, storageTypes);
         }
       } else {
         //otherwise randomly choose one from the network
         return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes, storageType);
+            maxNodesPerRack, results, avoidStaleNodes, storageTypes);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index f1730d4..be1ff14 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -28,16 +28,19 @@ import java.util.Map;
 import java.util.Queue;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.Time;
@@ -204,8 +207,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
    * in case of errors (e.g. datanode does not report if an error occurs
    * while writing the block).
    */
-  private int currApproxBlocksScheduled = 0;
-  private int prevApproxBlocksScheduled = 0;
+  private EnumCounters<StorageType> currApproxBlocksScheduled
+      = new EnumCounters<StorageType>(StorageType.class);
+  private EnumCounters<StorageType> prevApproxBlocksScheduled
+      = new EnumCounters<StorageType>(StorageType.class);
   private long lastBlocksScheduledRollTime = 0;
   private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
   private int volumeFailures = 0;
@@ -478,23 +483,46 @@ public class DatanodeDescriptor extends DatanodeInfo {
 
   /**
    * @return Approximate number of blocks currently scheduled to be written 
+   */
+  public long getRemaining(StorageType t) {
+    long remaining = 0;
+    for(DatanodeStorageInfo s : getStorageInfos()) {
+      if (s.getStorageType() == t) {
+        remaining += s.getRemaining();
+      }
+    }
+    return remaining;    
+  }
+
+  /**
+   * @return Approximate number of blocks currently scheduled to be written 
+   * to the given storage type of this datanode.
+   */
+  public int getBlocksScheduled(StorageType t) {
+    return (int)(currApproxBlocksScheduled.get(t)
+        + prevApproxBlocksScheduled.get(t));
+  }
+
+  /**
+   * @return Approximate number of blocks currently scheduled to be written 
    * to this datanode.
    */
   public int getBlocksScheduled() {
-    return currApproxBlocksScheduled + prevApproxBlocksScheduled;
+    return (int)(currApproxBlocksScheduled.sum()
+        + prevApproxBlocksScheduled.sum());
   }
 
   /** Increment the number of blocks scheduled. */
-  void incrementBlocksScheduled() {
-    currApproxBlocksScheduled++;
+  void incrementBlocksScheduled(StorageType t) {
+    currApproxBlocksScheduled.add(t, 1);;
   }
   
   /** Decrement the number of blocks scheduled. */
-  void decrementBlocksScheduled() {
-    if (prevApproxBlocksScheduled > 0) {
-      prevApproxBlocksScheduled--;
-    } else if (currApproxBlocksScheduled > 0) {
-      currApproxBlocksScheduled--;
+  void decrementBlocksScheduled(StorageType t) {
+    if (prevApproxBlocksScheduled.get(t) > 0) {
+      prevApproxBlocksScheduled.subtract(t, 1);
+    } else if (currApproxBlocksScheduled.get(t) > 0) {
+      currApproxBlocksScheduled.subtract(t, 1);
     } 
     // its ok if both counters are zero.
   }
@@ -502,8 +530,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /** Adjusts curr and prev number of blocks scheduled every few minutes. */
   private void rollBlocksScheduled(long now) {
     if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) {
-      prevApproxBlocksScheduled = currApproxBlocksScheduled;
-      currApproxBlocksScheduled = 0;
+      prevApproxBlocksScheduled.set(currApproxBlocksScheduled);
+      currApproxBlocksScheduled.reset();
       lastBlocksScheduledRollTime = now;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 4ddb7cc..58ca2ac 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -109,7 +109,7 @@ public class DatanodeStorageInfo {
 
   private long capacity;
   private long dfsUsed;
-  private long remaining;
+  private volatile long remaining;
   private long blockPoolUsed;
 
   private volatile BlockInfo blockList = null;
@@ -283,7 +283,7 @@ public class DatanodeStorageInfo {
   /** Increment the number of blocks scheduled for each given storage */ 
   public static void incrementBlocksScheduled(DatanodeStorageInfo... storages) 
{
     for (DatanodeStorageInfo s : storages) {
-      s.getDatanodeDescriptor().incrementBlocksScheduled();
+      s.getDatanodeDescriptor().incrementBlocksScheduled(s.getStorageType());
     }
   }
 
@@ -314,6 +314,26 @@ public class DatanodeStorageInfo {
         false, capacity, dfsUsed, remaining, blockPoolUsed);
   }
 
+  static Iterable<StorageType> toStorageTypes(
+      final Iterable<DatanodeStorageInfo> infos) {
+    return new Iterable<StorageType>() {
+        @Override
+        public Iterator<StorageType> iterator() {
+          return new Iterator<StorageType>() {
+            final Iterator<DatanodeStorageInfo> i = infos.iterator();
+            @Override
+            public boolean hasNext() {return i.hasNext();}
+            @Override
+            public StorageType next() {return i.next().getStorageType();}
+            @Override
+            public void remove() {
+              throw new UnsupportedOperationException();
+            }
+          };
+        }
+      };
+  }
+
   /** @return the first {@link DatanodeStorageInfo} corresponding to
    *          the given datanode
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/022474c8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index a3195eb..5062270 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1930,7 +1930,9 @@ public class DataNode extends ReconfigurableBase
             + b + " (numBytes=" + b.getNumBytes() + ")"
             + ", stage=" + stage
             + ", clientname=" + clientname
-            + ", targets=" + Arrays.asList(targets));
+            + ", targets=" + Arrays.asList(targets)
+            + ", target storage types=" + (targetStorageTypes == null ? "[]" :
+            Arrays.asList(targetStorageTypes)));
       }
       this.targets = targets;
       this.targetStorageTypes = targetStorageTypes;

Reply via email to