http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicies.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicies.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicies.java
new file mode 100644
index 0000000..622b258
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicies.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class BlockPlacementPolicies{
+
+  private final BlockPlacementPolicy replicationPolicy;
+  private final BlockPlacementPolicy ecPolicy;
+
+  public BlockPlacementPolicies(Configuration conf, FSClusterStats stats,
+                                NetworkTopology clusterMap,
+                                Host2NodesMap host2datanodeMap){
+    final Class<? extends BlockPlacementPolicy> replicatorClass = conf
+        .getClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+            DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
+            BlockPlacementPolicy.class);
+    replicationPolicy = ReflectionUtils.newInstance(replicatorClass, conf);
+    replicationPolicy.initialize(conf, stats, clusterMap, host2datanodeMap);
+    final Class<? extends BlockPlacementPolicy> blockPlacementECClass =
+        conf.getClass(DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY,
+            DFSConfigKeys.DFS_BLOCK_PLACEMENT_EC_CLASSNAME_DEFAULT,
+            BlockPlacementPolicy.class);
+    ecPolicy = ReflectionUtils.newInstance(blockPlacementECClass, conf);
+    ecPolicy.initialize(conf, stats, clusterMap, host2datanodeMap);
+  }
+
+  public BlockPlacementPolicy getPolicy(boolean isStriped){
+    if (isStriped) {
+      return ecPolicy;
+    } else {
+      return replicationPolicy;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
index 9696179..86aaf79 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
@@ -145,31 +145,7 @@ public abstract class BlockPlacementPolicy {
   abstract protected void initialize(Configuration conf,  FSClusterStats 
stats, 
                                      NetworkTopology clusterMap, 
                                      Host2NodesMap host2datanodeMap);
-    
-  /**
-   * Get an instance of the configured Block Placement Policy based on the
-   * the configuration property
-   * {@link  DFSConfigKeys#DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}.
-   * 
-   * @param conf the configuration to be used
-   * @param stats an object that is used to retrieve the load on the cluster
-   * @param clusterMap the network topology of the cluster
-   * @return an instance of BlockPlacementPolicy
-   */
-  public static BlockPlacementPolicy getInstance(Configuration conf, 
-                                                 FSClusterStats stats,
-                                                 NetworkTopology clusterMap,
-                                                 Host2NodesMap 
host2datanodeMap) {
-    final Class<? extends BlockPlacementPolicy> replicatorClass = 
conf.getClass(
-        DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
-        DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
-        BlockPlacementPolicy.class);
-    final BlockPlacementPolicy replicator = ReflectionUtils.newInstance(
-        replicatorClass, conf);
-    replicator.initialize(conf, stats, clusterMap, host2datanodeMap);
-    return replicator;
-  }
-  
+
   /**
    * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java
new file mode 100644
index 0000000..4dbf384
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolarent.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+
+import java.util.*;
+
+/**
+ * The class is responsible for choosing the desired number of targets
+ * for placing block replicas.
+ * The strategy is that it tries its best to place the replicas to most racks.
+ */
+@InterfaceAudience.Private
+public class BlockPlacementPolicyRackFaultTolarent extends 
BlockPlacementPolicyDefault {
+
+  @Override
+  protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
+    int clusterSize = clusterMap.getNumOfLeaves();
+    int totalNumOfReplicas = numOfChosen + numOfReplicas;
+    if (totalNumOfReplicas > clusterSize) {
+      numOfReplicas -= (totalNumOfReplicas-clusterSize);
+      totalNumOfReplicas = clusterSize;
+    }
+    // No calculation needed when there is only one rack or picking one node.
+    int numOfRacks = clusterMap.getNumOfRacks();
+    if (numOfRacks == 1 || totalNumOfReplicas <= 1) {
+      return new int[] {numOfReplicas, totalNumOfReplicas};
+    }
+    if(totalNumOfReplicas<numOfRacks){
+      return new int[] {numOfReplicas, 1};
+    }
+    int maxNodesPerRack = (totalNumOfReplicas - 1) / numOfRacks + 1;
+    return new int[] {numOfReplicas, maxNodesPerRack};
+  }
+
+  /**
+   * Choose numOfReplicas in order:
+   * 1. If total replica expected is less than numOfRacks in cluster, it choose
+   * randomly.
+   * 2. If total replica expected is bigger than numOfRacks, it choose:
+   *  2a. Fill each rack exactly (maxNodesPerRack-1) replicas.
+   *  2b. For some random racks, place one more replica to each one of them, 
until
+   *  numOfReplicas have been chosen. <br>
+   * In the end, the difference of the numbers of replicas for each two racks
+   * is no more than 1.
+   * Either way it always prefer local storage.
+   * @return local node of writer
+   */
+  @Override
+  protected Node chooseTargetInOrder(int numOfReplicas,
+                                 Node writer,
+                                 final Set<Node> excludedNodes,
+                                 final long blocksize,
+                                 final int maxNodesPerRack,
+                                 final List<DatanodeStorageInfo> results,
+                                 final boolean avoidStaleNodes,
+                                 final boolean newBlock,
+                                 EnumMap<StorageType, Integer> storageTypes)
+                                 throws NotEnoughReplicasException {
+    int totalReplicaExpected = results.size() + numOfReplicas;
+    int numOfRacks = clusterMap.getNumOfRacks();
+    if (totalReplicaExpected < numOfRacks ||
+        totalReplicaExpected % numOfRacks == 0) {
+      writer = chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+      return writer;
+    }
+
+    assert totalReplicaExpected > (maxNodesPerRack -1) * numOfRacks;
+
+    // Calculate numOfReplicas for filling each rack exactly 
(maxNodesPerRack-1)
+    // replicas.
+    HashMap<String, Integer> rackCounts = new HashMap<>();
+    for (DatanodeStorageInfo dsInfo : results) {
+      String rack = dsInfo.getDatanodeDescriptor().getNetworkLocation();
+      Integer count = rackCounts.get(rack);
+      if (count != null) {
+        rackCounts.put(rack, count + 1);
+      } else {
+        rackCounts.put(rack, 1);
+      }
+    }
+    int excess = 0; // Sum of the above (maxNodesPerRack-1) part of nodes in 
results
+    for (int count : rackCounts.values()) {
+      if (count > maxNodesPerRack -1) {
+        excess += count - (maxNodesPerRack -1);
+      }
+    }
+    numOfReplicas = Math.min(totalReplicaExpected - results.size(),
+        (maxNodesPerRack -1) * numOfRacks - (results.size() - excess));
+
+    // Fill each rack exactly (maxNodesPerRack-1) replicas.
+    writer = chooseOnce(numOfReplicas, writer, new HashSet<>(excludedNodes),
+        blocksize, maxNodesPerRack -1, results, avoidStaleNodes, storageTypes);
+
+    for (DatanodeStorageInfo resultStorage : results) {
+      addToExcludedNodes(resultStorage.getDatanodeDescriptor(), excludedNodes);
+    }
+
+    // For some racks, place one more replica to each one of them.
+    numOfReplicas = totalReplicaExpected - results.size();
+    chooseOnce(numOfReplicas, writer, excludedNodes, blocksize,
+        maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+
+    return writer;
+  }
+
+  /**
+   * Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
+   * Except that 1st replica prefer local storage.
+   * @return local node of writer.
+   */
+  private Node chooseOnce(int numOfReplicas,
+                            Node writer,
+                            final Set<Node> excludedNodes,
+                            final long blocksize,
+                            final int maxNodesPerRack,
+                            final List<DatanodeStorageInfo> results,
+                            final boolean avoidStaleNodes,
+                            EnumMap<StorageType, Integer> storageTypes)
+                            throws NotEnoughReplicasException {
+    if (numOfReplicas == 0) {
+      return writer;
+    }
+    writer = chooseLocalStorage(writer, excludedNodes, blocksize,
+        maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
+        .getDatanodeDescriptor();
+    if (--numOfReplicas == 0) {
+      return writer;
+    }
+    chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
+        maxNodesPerRack, results, avoidStaleNodes, storageTypes);
+    return writer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
index 0dbf485..0e0fd91 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
@@ -43,8 +43,15 @@ class BlocksMap {
 
     @Override
     public boolean hasNext() {
-      return blockInfo != null && nextIdx < blockInfo.getCapacity()
-              && blockInfo.getDatanode(nextIdx) != null;
+      if (blockInfo == null) {
+        return false;
+      }
+      while (nextIdx < blockInfo.getCapacity() &&
+          blockInfo.getDatanode(nextIdx) == null) {
+        // note that for striped blocks there may be null in the triplets
+        nextIdx++;
+      }
+      return nextIdx < blockInfo.getCapacity();
     }
 
     @Override
@@ -123,12 +130,16 @@ class BlocksMap {
       return;
 
     blockInfo.setBlockCollection(null);
-    for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) {
+    final int size = blockInfo instanceof BlockInfoContiguous ?
+        blockInfo.numNodes() : blockInfo.getCapacity();
+    for(int idx = size - 1; idx >= 0; idx--) {
       DatanodeDescriptor dn = blockInfo.getDatanode(idx);
-      dn.removeBlock(blockInfo); // remove from the list and wipe the location
+      if (dn != null) {
+        dn.removeBlock(blockInfo); // remove from the list and wipe the 
location
+      }
     }
   }
-  
+
   /** Returns the block object it it exists in the map. */
   BlockInfo getStoredBlock(Block b) {
     return blocks.get(b);
@@ -190,8 +201,8 @@ class BlocksMap {
     // remove block from the data-node list and the node from the block info
     boolean removed = node.removeBlock(info);
 
-    if (info.getDatanode(0) == null     // no datanodes left
-              && info.isDeleted()) {  // does not belong to a file
+    if (info.hasNoStorage()    // no datanodes left
+        && info.isDeleted()) { // does not belong to a file
       blocks.remove(b);  // remove block from the map
     }
     return removed;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 7e12a99..3cf9db6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -31,8 +31,8 @@ import java.util.Queue;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import com.google.common.collect.ImmutableList;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -41,13 +41,16 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.hdfs.util.EnumCounters;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.Time;
 
@@ -220,9 +223,12 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /** A queue of blocks to be replicated by this datanode */
   private final BlockQueue<BlockTargetPair> replicateBlocks =
       new BlockQueue<>();
+  /** A queue of blocks to be erasure coded by this datanode */
+  private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks =
+      new BlockQueue<>();
   /** A queue of blocks to be recovered by this datanode */
-  private final BlockQueue<BlockInfoContiguousUnderConstruction> recoverBlocks 
=
-                                new 
BlockQueue<BlockInfoContiguousUnderConstruction>();
+  private final BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
+      new BlockQueue<>();
   /** A set of blocks to be invalidated by this datanode */
   private final LightWeightHashSet<Block> invalidateBlocks =
       new LightWeightHashSet<>();
@@ -280,7 +286,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
-  DatanodeStorageInfo[] getStorageInfos() {
+  @VisibleForTesting
+  public DatanodeStorageInfo[] getStorageInfos() {
     synchronized (storageMap) {
       final Collection<DatanodeStorageInfo> storages = storageMap.values();
       return storages.toArray(new DatanodeStorageInfo[storages.size()]);
@@ -377,6 +384,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
       this.invalidateBlocks.clear();
       this.recoverBlocks.clear();
       this.replicateBlocks.clear();
+      this.erasurecodeBlocks.clear();
     }
     // pendingCached, cached, and pendingUncached are protected by the
     // FSN lock.
@@ -577,6 +585,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
   Iterator<BlockInfo> getBlockIterator() {
     return new BlockIterator(getStorageInfos());
   }
+
   Iterator<BlockInfo> getBlockIterator(final String storageID) {
     return new BlockIterator(getStorageInfo(storageID));
   }
@@ -598,9 +607,23 @@ public class DatanodeDescriptor extends DatanodeInfo {
   }
 
   /**
+   * Store block erasure coding work.
+   */
+  void addBlockToBeErasureCoded(ExtendedBlock block,
+      DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets,
+      short[] liveBlockIndices, ECSchema ecSchema, int cellSize) {
+    assert (block != null && sources != null && sources.length > 0);
+    BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets,
+        liveBlockIndices, ecSchema, cellSize);
+    erasurecodeBlocks.offer(task);
+    BlockManager.LOG.debug("Adding block recovery task " + task + "to "
+        + getName() + ", current queue size is " + erasurecodeBlocks.size());
+  }
+
+  /**
    * Store block recovery work.
    */
-  void addBlockToBeRecovered(BlockInfoContiguousUnderConstruction block) {
+  void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
     if(recoverBlocks.contains(block)) {
       // this prevents adding the same block twice to the recovery queue
       BlockManager.LOG.info(block + " is already in the recovery queue");
@@ -629,6 +652,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
   }
 
   /**
+   * The number of work items that are pending to be replicated
+   */
+  @VisibleForTesting
+  public int getNumberOfBlocksToBeErasureCoded() {
+    return erasurecodeBlocks.size();
+  }
+
+  /**
    * The number of block invalidation items that are pending to 
    * be sent to the datanode
    */
@@ -642,11 +673,15 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return replicateBlocks.poll(maxTransfers);
   }
 
-  public BlockInfoContiguousUnderConstruction[] getLeaseRecoveryCommand(int 
maxTransfers) {
-    List<BlockInfoContiguousUnderConstruction> blocks = 
recoverBlocks.poll(maxTransfers);
+  public List<BlockECRecoveryInfo> getErasureCodeCommand(int maxTransfers) {
+    return erasurecodeBlocks.poll(maxTransfers);
+  }
+
+  public BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int 
maxTransfers) {
+    List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
     if(blocks == null)
       return null;
-    return blocks.toArray(new 
BlockInfoContiguousUnderConstruction[blocks.size()]);
+    return blocks.toArray(new BlockInfoUnderConstruction[blocks.size()]);
   }
 
   /**
@@ -660,6 +695,13 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
+  @VisibleForTesting
+  public boolean containsInvalidateBlock(Block block) {
+    synchronized (invalidateBlocks) {
+      return invalidateBlocks.contains(block);
+    }
+  }
+
   /**
    * @return Approximate number of blocks currently scheduled to be written 
    */
@@ -842,6 +884,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
     if (repl > 0) {
       sb.append(" ").append(repl).append(" blocks to be replicated;");
     }
+    int ec = erasurecodeBlocks.size();
+    if(ec > 0) {
+      sb.append(" ").append(ec).append(" blocks to be erasure coded;");
+    }
     int inval = invalidateBlocks.size();
     if (inval > 0) {
       sb.append(" ").append(inval).append(" blocks to be invalidated;");      

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 3397bbb..dbd07d4 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.protocol.*;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.*;
@@ -1380,29 +1381,29 @@ public class DatanodeManager {
         }
 
         //check lease recovery
-        BlockInfoContiguousUnderConstruction[] blocks = nodeinfo
+        BlockInfoUnderConstruction[] blocks = nodeinfo
             .getLeaseRecoveryCommand(Integer.MAX_VALUE);
         if (blocks != null) {
           BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
               blocks.length);
-          for (BlockInfoContiguousUnderConstruction b : blocks) {
+          for (BlockInfoUnderConstruction b : blocks) {
             final DatanodeStorageInfo[] storages = 
b.getExpectedStorageLocations();
             // Skip stale nodes during recovery - not heart beated for some 
time (30s by default).
             final List<DatanodeStorageInfo> recoveryLocations =
                 new ArrayList<>(storages.length);
-            for (int i = 0; i < storages.length; i++) {
-              if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) 
{
-                recoveryLocations.add(storages[i]);
+            for (DatanodeStorageInfo storage : storages) {
+              if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
+                recoveryLocations.add(storage);
               }
             }
             // If we are performing a truncate recovery than set recovery 
fields
             // to old block.
             boolean truncateRecovery = b.getTruncateBlock() != null;
             boolean copyOnTruncateRecovery = truncateRecovery &&
-                b.getTruncateBlock().getBlockId() != b.getBlockId();
+                b.getTruncateBlock().getBlockId() != b.toBlock().getBlockId();
             ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
                 new ExtendedBlock(blockPoolId, b.getTruncateBlock()) :
-                new ExtendedBlock(blockPoolId, b);
+                new ExtendedBlock(blockPoolId, b.toBlock());
             // If we only get 1 replica after eliminating stale nodes, then 
choose all
             // replicas for recovery and let the primary data node handle 
failures.
             DatanodeInfo[] recoveryInfos;
@@ -1419,7 +1420,7 @@ public class DatanodeManager {
               recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
             }
             if(truncateRecovery) {
-              Block recoveryBlock = (copyOnTruncateRecovery) ? b :
+              Block recoveryBlock = (copyOnTruncateRecovery) ? b.toBlock() :
                   b.getTruncateBlock();
               brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos,
                                                 recoveryBlock));
@@ -1439,6 +1440,13 @@ public class DatanodeManager {
           cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
               pendingList));
         }
+        // checking pending erasure coding tasks
+        List<BlockECRecoveryInfo> pendingECList =
+            nodeinfo.getErasureCodeCommand(maxTransfers);
+        if (pendingECList != null) {
+          cmds.add(new 
BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
+              pendingECList));
+        }
         //check block invalidation
         Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
         if (blks != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 216d6d2..bdf9f9f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -24,6 +24,7 @@ import java.util.List;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
@@ -209,6 +210,7 @@ public class DatanodeStorageInfo {
     return getState() == State.FAILED && numBlocks != 0;
   }
 
+  @VisibleForTesting
   public String getStorageID() {
     return storageID;
   }
@@ -233,7 +235,7 @@ public class DatanodeStorageInfo {
     return blockPoolUsed;
   }
 
-  public AddBlockResult addBlock(BlockInfo b) {
+  public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) {
     // First check whether the block belongs to a different storage
     // on the same DN.
     AddBlockResult result = AddBlockResult.ADDED;
@@ -252,10 +254,18 @@ public class DatanodeStorageInfo {
     }
 
     // add to the head of the data-node list
-    b.addStorage(this);
+    b.addStorage(this, reportedBlock);
+    insertToList(b);
+    return result;
+  }
+
+  AddBlockResult addBlock(BlockInfo b) {
+    return addBlock(b, b);
+  }
+
+  public void insertToList(BlockInfo b) {
     blockList = b.listInsert(blockList, this);
     numBlocks++;
-    return result;
   }
 
   public boolean removeBlock(BlockInfo b) {
@@ -274,7 +284,6 @@ public class DatanodeStorageInfo {
   
   Iterator<BlockInfo> getBlockIterator() {
     return new BlockIterator(blockList);
-
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index 797d031..5e3cac2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -36,7 +36,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.util.CyclicIteration;
@@ -234,14 +233,14 @@ public class DecommissionManager {
   }
 
   /**
-   * Checks whether a block is sufficiently replicated for decommissioning.
-   * Full-strength replication is not always necessary, hence "sufficient".
+   * Checks whether a block is sufficiently replicated/stored for
+   * decommissioning. For replicated blocks or striped blocks, full-strength
+   * replication or storage is not always necessary, hence "sufficient".
    * @return true if sufficient, else false.
    */
-  private boolean isSufficientlyReplicated(BlockInfo block,
-      BlockCollection bc,
+  private boolean isSufficient(BlockInfo block, BlockCollection bc,
       NumberReplicas numberReplicas) {
-    final int numExpected = bc.getPreferredBlockReplication();
+    final int numExpected = blockManager.getExpectedReplicaNum(bc, block);
     final int numLive = numberReplicas.liveReplicas();
     if (!blockManager.isNeededReplication(block, numExpected, numLive)) {
       // Block doesn't need replication. Skip.
@@ -255,18 +254,19 @@ public class DecommissionManager {
     if (numExpected > numLive) {
       if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
         // Can decom a UC block as long as there will still be minReplicas
-        if (numLive >= blockManager.minReplication) {
+        if (blockManager.hasMinStorage(block, numLive)) {
           LOG.trace("UC block {} sufficiently-replicated since numLive ({}) "
-              + ">= minR ({})", block, numLive, blockManager.minReplication);
+              + ">= minR ({})", block, numLive,
+              blockManager.getMinStorageNum(block));
           return true;
         } else {
           LOG.trace("UC block {} insufficiently-replicated since numLive "
               + "({}) < minR ({})", block, numLive,
-              blockManager.minReplication);
+              blockManager.getMinStorageNum(block));
         }
       } else {
         // Can decom a non-UC as long as the default replication is met
-        if (numLive >= blockManager.defaultReplication) {
+        if (numLive >= blockManager.getDefaultStorageNum(block)) {
           return true;
         }
       }
@@ -274,11 +274,11 @@ public class DecommissionManager {
     return false;
   }
 
-  private static void logBlockReplicationInfo(Block block, BlockCollection bc,
+  private void logBlockReplicationInfo(BlockInfo block, BlockCollection bc,
       DatanodeDescriptor srcNode, NumberReplicas num,
       Iterable<DatanodeStorageInfo> storages) {
     int curReplicas = num.liveReplicas();
-    int curExpectedReplicas = bc.getPreferredBlockReplication();
+    int curExpectedReplicas = blockManager.getExpectedReplicaNum(bc, block);
     StringBuilder nodeList = new StringBuilder();
     for (DatanodeStorageInfo storage : storages) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
@@ -407,14 +407,14 @@ public class DecommissionManager {
           // that are insufficiently replicated for further tracking
           LOG.debug("Newly-added node {}, doing full scan to find " +
               "insufficiently-replicated blocks.", dn);
-          blocks = handleInsufficientlyReplicated(dn);
+          blocks = handleInsufficientlyStored(dn);
           decomNodeBlocks.put(dn, blocks);
           fullScan = true;
         } else {
           // This is a known datanode, check if its # of insufficiently 
           // replicated blocks has dropped to zero and if it can be decommed
           LOG.debug("Processing decommission-in-progress node {}", dn);
-          pruneSufficientlyReplicated(dn, blocks);
+          pruneReliableBlocks(dn, blocks);
         }
         if (blocks.size() == 0) {
           if (!fullScan) {
@@ -426,7 +426,7 @@ public class DecommissionManager {
             // marking the datanode as decommissioned 
             LOG.debug("Node {} has finished replicating current set of "
                 + "blocks, checking with the full block map.", dn);
-            blocks = handleInsufficientlyReplicated(dn);
+            blocks = handleInsufficientlyStored(dn);
             decomNodeBlocks.put(dn, blocks);
           }
           // If the full scan is clean AND the node liveness is okay, 
@@ -467,25 +467,23 @@ public class DecommissionManager {
     }
 
     /**
-     * Removes sufficiently replicated blocks from the block list of a 
-     * datanode.
+     * Removes reliable blocks from the block list of a datanode.
      */
-    private void pruneSufficientlyReplicated(final DatanodeDescriptor datanode,
+    private void pruneReliableBlocks(final DatanodeDescriptor datanode,
         AbstractList<BlockInfo> blocks) {
       processBlocksForDecomInternal(datanode, blocks.iterator(), null, true);
     }
 
     /**
-     * Returns a list of blocks on a datanode that are insufficiently 
-     * replicated, i.e. are under-replicated enough to prevent decommission.
+     * Returns a list of blocks on a datanode that are insufficiently 
replicated
+     * or require recovery, i.e. requiring recovery and should prevent
+     * decommission.
      * <p/>
-     * As part of this, it also schedules replication work for 
-     * any under-replicated blocks.
+     * As part of this, it also schedules replication/recovery work.
      *
-     * @param datanode
-     * @return List of insufficiently replicated blocks 
+     * @return List of blocks requiring recovery
      */
-    private AbstractList<BlockInfo> handleInsufficientlyReplicated(
+    private AbstractList<BlockInfo> handleInsufficientlyStored(
         final DatanodeDescriptor datanode) {
       AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>();
       processBlocksForDecomInternal(datanode, datanode.getBlockIterator(),
@@ -496,24 +494,22 @@ public class DecommissionManager {
     /**
      * Used while checking if decommission-in-progress datanodes can be marked
      * as decommissioned. Combines shared logic of 
-     * pruneSufficientlyReplicated and handleInsufficientlyReplicated.
+     * pruneReliableBlocks and handleInsufficientlyStored.
      *
      * @param datanode                    Datanode
      * @param it                          Iterator over the blocks on the
      *                                    datanode
-     * @param insufficientlyReplicated    Return parameter. If it's not null,
+     * @param insufficientList            Return parameter. If it's not null,
      *                                    will contain the insufficiently
      *                                    replicated-blocks from the list.
-     * @param pruneSufficientlyReplicated whether to remove sufficiently
-     *                                    replicated blocks from the iterator
-     * @return true if there are under-replicated blocks in the provided block
-     * iterator, else false.
+     * @param pruneReliableBlocks         whether to remove blocks reliable
+     *                                    enough from the iterator
      */
     private void processBlocksForDecomInternal(
         final DatanodeDescriptor datanode,
         final Iterator<BlockInfo> it,
-        final List<BlockInfo> insufficientlyReplicated,
-        boolean pruneSufficientlyReplicated) {
+        final List<BlockInfo> insufficientList,
+        boolean pruneReliableBlocks) {
       boolean firstReplicationLog = true;
       int underReplicatedBlocks = 0;
       int decommissionOnlyReplicas = 0;
@@ -528,7 +524,7 @@ public class DecommissionManager {
           it.remove();
           continue;
         }
-        BlockCollection bc = blockManager.blocksMap.getBlockCollection(block);
+        BlockCollection bc = blockManager.getBlockCollection(block);
         if (bc == null) {
           // Orphan block, will be invalidated eventually. Skip.
           continue;
@@ -536,35 +532,34 @@ public class DecommissionManager {
 
         final NumberReplicas num = blockManager.countNodes(block);
         final int liveReplicas = num.liveReplicas();
-        final int curReplicas = liveReplicas;
 
         // Schedule under-replicated blocks for replication if not already
         // pending
         if (blockManager.isNeededReplication(block,
-            bc.getPreferredBlockReplication(), liveReplicas)) {
+            blockManager.getExpectedReplicaNum(bc, block), liveReplicas)) {
           if (!blockManager.neededReplications.contains(block) &&
               blockManager.pendingReplications.getNumReplicas(block) == 0 &&
               namesystem.isPopulatingReplQueues()) {
             // Process these blocks only when active NN is out of safe mode.
             blockManager.neededReplications.add(block,
-                curReplicas,
+                liveReplicas,
                 num.decommissionedAndDecommissioning(),
-                bc.getPreferredBlockReplication());
+                blockManager.getExpectedReplicaNum(bc, block));
           }
         }
 
         // Even if the block is under-replicated, 
-        // it doesn't block decommission if it's sufficiently replicated 
-        if (isSufficientlyReplicated(block, bc, num)) {
-          if (pruneSufficientlyReplicated) {
+        // it doesn't block decommission if it's sufficiently replicated
+        if (isSufficient(block, bc, num)) {
+          if (pruneReliableBlocks) {
             it.remove();
           }
           continue;
         }
 
         // We've found an insufficiently replicated block.
-        if (insufficientlyReplicated != null) {
-          insufficientlyReplicated.add(block);
+        if (insufficientList != null) {
+          insufficientList.add(block);
         }
         // Log if this is our first time through
         if (firstReplicationLog) {
@@ -577,7 +572,7 @@ public class DecommissionManager {
         if (bc.isUnderConstruction()) {
           underReplicatedInOpenFiles++;
         }
-        if ((curReplicas == 0) && (num.decommissionedAndDecommissioning() > 
0)) {
+        if ((liveReplicas == 0) && (num.decommissionedAndDecommissioning() > 
0)) {
           decommissionOnlyReplicas++;
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java
new file mode 100644
index 0000000..f4600cb7
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+
+/**
+ * ReplicaUnderConstruction contains information about replicas (or blocks
+ * belonging to a block group) while they are under construction.
+ *
+ * The GS, the length and the state of the replica is as reported by the
+ * datanode.
+ *
+ * It is not guaranteed, but expected, that datanodes actually have
+ * corresponding replicas.
+ */
+class ReplicaUnderConstruction extends Block {
+  private final DatanodeStorageInfo expectedLocation;
+  private HdfsServerConstants.ReplicaState state;
+  private boolean chosenAsPrimary;
+
+  ReplicaUnderConstruction(Block block,
+      DatanodeStorageInfo target,
+      HdfsServerConstants.ReplicaState state) {
+    super(block);
+    this.expectedLocation = target;
+    this.state = state;
+    this.chosenAsPrimary = false;
+  }
+
+  /**
+   * Expected block replica location as assigned when the block was allocated.
+   * This defines the pipeline order.
+   * It is not guaranteed, but expected, that the data-node actually has
+   * the replica.
+   */
+  DatanodeStorageInfo getExpectedStorageLocation() {
+    return expectedLocation;
+  }
+
+  /**
+   * Get replica state as reported by the data-node.
+   */
+  HdfsServerConstants.ReplicaState getState() {
+    return state;
+  }
+
+  /**
+   * Whether the replica was chosen for recovery.
+   */
+  boolean getChosenAsPrimary() {
+    return chosenAsPrimary;
+  }
+
+  /**
+   * Set replica state.
+   */
+  void setState(HdfsServerConstants.ReplicaState s) {
+    state = s;
+  }
+
+  /**
+   * Set whether this replica was chosen for recovery.
+   */
+  void setChosenAsPrimary(boolean chosenAsPrimary) {
+    this.chosenAsPrimary = chosenAsPrimary;
+  }
+
+  /**
+   * Is data-node the replica belongs to alive.
+   */
+  boolean isAlive() {
+    return expectedLocation.getDatanodeDescriptor().isAlive;
+  }
+
+  @Override // Block
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override // Block
+  public boolean equals(Object obj) {
+    // Sufficient to rely on super's implementation
+    return (this == obj) || super.equals(obj);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder(50);
+    appendStringTo(b);
+    return b.toString();
+  }
+
+  @Override
+  public void appendStringTo(StringBuilder sb) {
+    sb.append("ReplicaUC[")
+        .append(expectedLocation)
+        .append("|")
+        .append(state)
+        .append("]");
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java
new file mode 100644
index 0000000..479ee4c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.util.SequentialNumber;
+
+import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BLOCK_GROUP_INDEX_MASK;
+import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_BLOCKS_IN_GROUP;
+
+/**
+ * Generate the next valid block group ID by incrementing the maximum block
+ * group ID allocated so far, with the first 2^10 block group IDs reserved.
+ * HDFS-EC introduces a hierarchical protocol to name blocks and groups:
+ * Contiguous: {reserved block IDs | flag | block ID}
+ * Striped: {reserved block IDs | flag | block group ID | index in group}
+ *
+ * Following n bits of reserved block IDs, The (n+1)th bit in an ID
+ * distinguishes contiguous (0) and striped (1) blocks. For a striped block,
+ * bits (n+2) to (64-m) represent the ID of its block group, while the last m
+ * bits represent its index of the group. The value m is determined by the
+ * maximum number of blocks in a group (MAX_BLOCKS_IN_GROUP).
+ *
+ * Note that the {@link #nextValue()} methods requires external lock to
+ * guarantee IDs have no conflicts.
+ */
+@InterfaceAudience.Private
+public class SequentialBlockGroupIdGenerator extends SequentialNumber {
+
+  private final BlockManager blockManager;
+
+  SequentialBlockGroupIdGenerator(BlockManager blockManagerRef) {
+    super(Long.MIN_VALUE);
+    this.blockManager = blockManagerRef;
+  }
+
+  @Override // NumberGenerator
+  public long nextValue() {
+    skipTo((getCurrentValue() & ~BLOCK_GROUP_INDEX_MASK) + 
MAX_BLOCKS_IN_GROUP);
+    // Make sure there's no conflict with existing random block IDs
+    final Block b = new Block(getCurrentValue());
+    while (hasValidBlockInRange(b)) {
+      skipTo(getCurrentValue() + MAX_BLOCKS_IN_GROUP);
+      b.setBlockId(getCurrentValue());
+    }
+    if (b.getBlockId() >= 0) {
+      throw new IllegalStateException("All negative block group IDs are used, "
+          + "growing into positive IDs, "
+          + "which might conflict with non-erasure coded blocks.");
+    }
+    return getCurrentValue();
+  }
+
+  /**
+   * @param b A block object whose id is set to the starting point for check
+   * @return true if any ID in the range
+   *      {id, id+HdfsConstants.MAX_BLOCKS_IN_GROUP} is pointed-to by a file
+   */
+  private boolean hasValidBlockInRange(Block b) {
+    final long id = b.getBlockId();
+    for (int i = 0; i < MAX_BLOCKS_IN_GROUP; i++) {
+      b.setBlockId(id + i);
+      if (blockManager.getBlockCollection(b) != null) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java
index eef8857..6074784 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.util.SequentialNumber;
 
 /**
@@ -54,6 +53,11 @@ public class SequentialBlockIdGenerator extends 
SequentialNumber {
     while(isValidBlock(b)) {
       b.setBlockId(super.nextValue());
     }
+    if (b.getBlockId() < 0) {
+      throw new IllegalStateException("All positive block IDs are used, " +
+          "wrapping to negative IDs, " +
+          "which might conflict with erasure coded block groups.");
+    }
     return b.getBlockId();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
index ebc15b8..7e8f479 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
  *
  * <p/>
  * The policy for choosing which priority to give added blocks
- * is implemented in {@link #getPriority(int, int, int)}.
+ * is implemented in {@link #getPriority(BlockInfo, int, int, int)}.
  * </p>
  * <p>The queue order is as follows:</p>
  * <ol>
@@ -145,14 +145,28 @@ class UnderReplicatedBlocks implements 
Iterable<BlockInfo> {
    * @param expectedReplicas expected number of replicas of the block
    * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
    */
-  private int getPriority(int curReplicas,
+  private int getPriority(BlockInfo block,
+                          int curReplicas,
                           int decommissionedReplicas,
                           int expectedReplicas) {
     assert curReplicas >= 0 : "Negative replicas!";
     if (curReplicas >= expectedReplicas) {
       // Block has enough copies, but not enough racks
       return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
-    } else if (curReplicas == 0) {
+    }
+    if (block.isStriped()) {
+      BlockInfoStriped sblk = (BlockInfoStriped) block;
+      return getPriorityStriped(curReplicas, decommissionedReplicas,
+          sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
+    } else {
+      return getPriorityContiguous(curReplicas, decommissionedReplicas,
+          expectedReplicas);
+    }
+  }
+
+  private int getPriorityContiguous(int curReplicas, int 
decommissionedReplicas,
+      int expectedReplicas) {
+    if (curReplicas == 0) {
       // If there are zero non-decommissioned replicas but there are
       // some decommissioned replicas, then assign them highest priority
       if (decommissionedReplicas > 0) {
@@ -161,7 +175,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
       //all we have are corrupt blocks
       return QUEUE_WITH_CORRUPT_BLOCKS;
     } else if (curReplicas == 1) {
-      //only on replica -risk of loss
+      // only one replica, highest risk of loss
       // highest priority
       return QUEUE_HIGHEST_PRIORITY;
     } else if ((curReplicas * 3) < expectedReplicas) {
@@ -174,6 +188,27 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> 
{
     }
   }
 
+  private int getPriorityStriped(int curReplicas, int decommissionedReplicas,
+      short dataBlkNum, short parityBlkNum) {
+    if (curReplicas < dataBlkNum) {
+      // There are some replicas on decommissioned nodes so it's not corrupted
+      if (curReplicas + decommissionedReplicas >= dataBlkNum) {
+        return QUEUE_HIGHEST_PRIORITY;
+      }
+      return QUEUE_WITH_CORRUPT_BLOCKS;
+    } else if (curReplicas == dataBlkNum) {
+      // highest risk of loss, highest priority
+      return QUEUE_HIGHEST_PRIORITY;
+    } else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) {
+      // can only afford one replica loss
+      // this is considered very under-replicated
+      return QUEUE_VERY_UNDER_REPLICATED;
+    } else {
+      // add to the normal queue for under replicated blocks
+      return QUEUE_UNDER_REPLICATED;
+    }
+  }
+
   /** add a block to a under replication queue according to its priority
    * @param block a under replication block
    * @param curReplicas current number of replicas of the block
@@ -186,7 +221,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
                            int decomissionedReplicas,
                            int expectedReplicas) {
     assert curReplicas >= 0 : "Negative replicas!";
-    int priLevel = getPriority(curReplicas, decomissionedReplicas,
+    int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
                                expectedReplicas);
     if(priorityQueues.get(priLevel).add(block)) {
       if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
@@ -209,7 +244,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
                               int oldReplicas,
                               int decommissionedReplicas,
                               int oldExpectedReplicas) {
-    int priLevel = getPriority(oldReplicas,
+    int priLevel = getPriority(block, oldReplicas,
                                decommissionedReplicas,
                                oldExpectedReplicas);
     boolean removedBlock = remove(block, priLevel);
@@ -283,9 +318,9 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
                            int curReplicasDelta, int expectedReplicasDelta) {
     int oldReplicas = curReplicas-curReplicasDelta;
     int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
-    int curPri = getPriority(curReplicas, decommissionedReplicas,
+    int curPri = getPriority(block, curReplicas, decommissionedReplicas,
         curExpectedReplicas);
-    int oldPri = getPriority(oldReplicas, decommissionedReplicas,
+    int oldPri = getPriority(block, oldReplicas, decommissionedReplicas,
         oldExpectedReplicas);
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
index 11194dc..eb45aa2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
@@ -393,4 +393,9 @@ public interface HdfsServerConstants {
       "raw.hdfs.crypto.file.encryption.info";
   String SECURITY_XATTR_UNREADABLE_BY_SUPERUSER =
       "security.hdfs.unreadable.by.superuser";
+  String XATTR_ERASURECODING_ZONE =
+      "raw.hdfs.erasurecoding.zone";
+
+  long BLOCK_GROUP_INDEX_MASK = 15;
+  byte MAX_BLOCKS_IN_GROUP = 16;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 92323f1..d77b36d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -32,11 +32,13 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
 import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.protocol.*;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
 import 
org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -722,6 +724,11 @@ class BPOfferService {
         dxcs.balanceThrottler.setBandwidth(bandwidth);
       }
       break;
+    case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY:
+      LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY");
+      Collection<BlockECRecoveryInfo> ecTasks = ((BlockECRecoveryCommand) 
cmd).getECTasks();
+      dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }
@@ -751,6 +758,7 @@ class BPOfferService {
     case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
     case DatanodeProtocol.DNA_CACHE:
     case DatanodeProtocol.DNA_UNCACHE:
+    case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY:
       LOG.warn("Got a command from standby NN - ignoring command:" + 
cmd.getAction());
       break;
     default:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index abc9390..85f194a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -245,6 +245,33 @@ public class DNConf {
   }
 
   /**
+   * Returns true if connect to datanode via hostname
+   * 
+   * @return boolean true if connect to datanode via hostname
+   */
+  public boolean getConnectToDnViaHostname() {
+    return connectToDnViaHostname;
+  }
+
+  /**
+   * Returns socket timeout
+   * 
+   * @return int socket timeout
+   */
+  public int getSocketTimeout() {
+    return socketTimeout;
+  }
+
+  /**
+   * Returns socket write timeout
+   * 
+   * @return int socket write timeout
+   */
+  public int getSocketWriteTimeout() {
+    return socketWriteTimeout;
+  }
+
+  /**
    * Returns the SaslPropertiesResolver configured for use with
    * DataTransferProtocol, or null if not configured.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index e265dad..1b695e3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -87,6 +87,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.ObjectName;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -151,6 +152,7 @@ import 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import 
org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
+import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -358,6 +360,8 @@ public class DataNode extends ReconfigurableBase
   private String dnUserName = null;
 
   private SpanReceiverHost spanReceiverHost;
+
+  private ErasureCodingWorker ecWorker;
   private static final int NUM_CORES = Runtime.getRuntime()
       .availableProcessors();
   private static final double CONGESTION_RATIO = 1.5;
@@ -1159,6 +1163,8 @@ public class DataNode extends ReconfigurableBase
     saslClient = new SaslDataTransferClient(dnConf.conf, 
         dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
     saslServer = new SaslDataTransferServer(dnConf, 
blockPoolTokenSecretManager);
+    // Initialize ErasureCoding worker
+    ecWorker = new ErasureCodingWorker(conf, this);
   }
 
   /**
@@ -1223,6 +1229,10 @@ public class DataNode extends ReconfigurableBase
     return UUID.randomUUID().toString();
   }
 
+  public SaslDataTransferClient getSaslClient() {
+    return saslClient;
+  }
+
   /**
    * Verify that the DatanodeUuid has been initialized. If this is a new
    * datanode then we generate a new Datanode Uuid and persist it to disk.
@@ -1485,7 +1495,7 @@ public class DataNode extends ReconfigurableBase
   /**
    * Creates either NIO or regular depending on socketWriteTimeout.
    */
-  protected Socket newSocket() throws IOException {
+  public Socket newSocket() throws IOException {
     return (dnConf.socketWriteTimeout > 0) ? 
            SocketChannel.open().socket() : new Socket();                       
            
   }
@@ -1894,6 +1904,21 @@ public class DataNode extends ReconfigurableBase
   int getXmitsInProgress() {
     return xmitsInProgress.get();
   }
+  
+  /**
+   * Increments the xmitsInProgress count. xmitsInProgress count represents the
+   * number of data replication/reconstruction tasks running currently.
+   */
+  public void incrementXmitsInProgress() {
+    xmitsInProgress.getAndIncrement();
+  }
+
+  /**
+   * Decrements the xmitsInProgress count
+   */
+  public void decrementXmitsInProgress() {
+    xmitsInProgress.getAndDecrement();
+  }
 
   private void reportBadBlock(final BPOfferService bpos,
       final ExtendedBlock block, final String msg) {
@@ -2113,7 +2138,7 @@ public class DataNode extends ReconfigurableBase
      */
     @Override
     public void run() {
-      xmitsInProgress.getAndIncrement();
+      incrementXmitsInProgress();
       Socket sock = null;
       DataOutputStream out = null;
       DataInputStream in = null;
@@ -2133,11 +2158,8 @@ public class DataNode extends ReconfigurableBase
         //
         // Header info
         //
-        Token<BlockTokenIdentifier> accessToken = 
BlockTokenSecretManager.DUMMY_TOKEN;
-        if (isBlockTokenEnabled) {
-          accessToken = blockPoolTokenSecretManager.generateToken(b, 
-              EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
-        }
+        Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b, 
+            EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
 
         long writeTimeout = dnConf.socketWriteTimeout + 
                             HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * 
(targets.length-1);
@@ -2195,7 +2217,7 @@ public class DataNode extends ReconfigurableBase
         // check if there are any disk problem
         checkDiskErrorAsync();
       } finally {
-        xmitsInProgress.getAndDecrement();
+        decrementXmitsInProgress();
         IOUtils.closeStream(blockSender);
         IOUtils.closeStream(out);
         IOUtils.closeStream(in);
@@ -2204,6 +2226,19 @@ public class DataNode extends ReconfigurableBase
     }
   }
 
+  /***
+   * Use BlockTokenSecretManager to generate block token for current user.
+   */
+  public Token<BlockTokenIdentifier> getBlockAccessToken(ExtendedBlock b,
+      EnumSet<AccessMode> mode) throws IOException {
+    Token<BlockTokenIdentifier> accessToken = 
+        BlockTokenSecretManager.DUMMY_TOKEN;
+    if (isBlockTokenEnabled) {
+      accessToken = blockPoolTokenSecretManager.generateToken(b, mode);
+    }
+    return accessToken;
+  }
+
   /**
    * Returns a new DataEncryptionKeyFactory that generates a key from the
    * BlockPoolTokenSecretManager, using the block pool ID of the given block.
@@ -2211,7 +2246,7 @@ public class DataNode extends ReconfigurableBase
    * @param block for which the factory needs to create a key
    * @return DataEncryptionKeyFactory for block's block pool ID
    */
-  DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
+  public DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock(
       final ExtendedBlock block) {
     return new DataEncryptionKeyFactory() {
       @Override
@@ -3259,4 +3294,9 @@ public class DataNode extends ReconfigurableBase
     checkSuperuserPrivilege();
     spanReceiverHost.removeSpanReceiver(id);
   }
+  
+  public ErasureCodingWorker getErasureCodingWorker(){
+    return ecWorker;
+    
+  }
 }

Reply via email to