http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
new file mode 100644
index 0000000..9285fd7
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TestRecoverStripedFile {
+  public static final Log LOG = 
LogFactory.getLog(TestRecoverStripedFile.class);
+  
+  private static final int dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS;
+  private static final int parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS;
+  private static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private static final int blockSize = cellSize * 3;
+  private static final int groupSize = dataBlkNum + parityBlkNum;
+  private static final int dnNum = groupSize + parityBlkNum;
+  
+  private MiniDFSCluster cluster;
+  private Configuration conf;
+  private DistributedFileSystem fs;
+  // Map: DatanodeID -> datanode index in cluster
+  private Map<DatanodeID, Integer> dnMap = new HashMap<DatanodeID, Integer>();
+
+  @Before
+  public void setup() throws IOException {
+    conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, 
cellSize - 1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();;
+    cluster.waitActive();
+    
+    fs = cluster.getFileSystem();
+    fs.getClient().createErasureCodingZone("/", null, 0);
+
+    List<DataNode> datanodes = cluster.getDataNodes();
+    for (int i = 0; i < dnNum; i++) {
+      dnMap.put(datanodes.get(i).getDatanodeId(), i);
+    }
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverOneParityBlock() throws Exception {
+    int fileLen = 10 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen, 0, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverOneParityBlock1() throws Exception {
+    int fileLen = cellSize + cellSize/10;
+    assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen, 0, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverOneParityBlock2() throws Exception {
+    int fileLen = 1;
+    assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen, 0, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverOneParityBlock3() throws Exception {
+    int fileLen = 3 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen, 0, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverThreeParityBlocks() throws Exception {
+    int fileLen = 10 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, 0, 3);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverThreeDataBlocks() throws Exception {
+    int fileLen = 10 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, 1, 3);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverThreeDataBlocks1() throws Exception {
+    int fileLen = 3 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen, 1, 3);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverOneDataBlock() throws Exception {
+    int fileLen = 10 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverOneDataBlock1() throws Exception {
+    int fileLen = cellSize + cellSize/10;
+    assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen, 1, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverOneDataBlock2() throws Exception {
+    int fileLen = 1;
+    assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen, 1, 1);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverAnyBlocks() throws Exception {
+    int fileLen = 3 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2);
+  }
+  
+  @Test(timeout = 120000)
+  public void testRecoverAnyBlocks1() throws Exception {
+    int fileLen = 10 * blockSize + blockSize/10;
+    assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen, 2, 3);
+  }
+  
+  /**
+   * Test the file blocks recovery.
+   * 1. Check the replica is recovered in the target datanode, 
+   *    and verify the block replica length, generationStamp and content.
+   * 2. Read the file and verify content. 
+   */
+  private void assertFileBlocksRecovery(String fileName, int fileLen,
+      int recovery, int toRecoverBlockNum) throws Exception {
+    if (recovery != 0 && recovery != 1 && recovery != 2) {
+      Assert.fail("Invalid recovery: 0 is to recovery parity blocks,"
+          + "1 is to recovery data blocks, 2 is any.");
+    }
+    if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) {
+      Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum);
+    }
+    
+    Path file = new Path(fileName);
+    
+    testCreateStripedFile(file, fileLen);
+    
+    LocatedBlocks locatedBlocks = getLocatedBlocks(file);
+    assertEquals(locatedBlocks.getFileLength(), fileLen);
+    
+    LocatedStripedBlock lastBlock = 
+        (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
+    
+    DatanodeInfo[] storageInfos = lastBlock.getLocations();
+    int[] indices = lastBlock.getBlockIndices();
+    
+    BitSet bitset = new BitSet(dnNum);
+    for (DatanodeInfo storageInfo : storageInfos) {
+      bitset.set(dnMap.get(storageInfo));
+    }
+    
+    int[] toDead = new int[toRecoverBlockNum];
+    int n = 0;
+    for (int i = 0; i < indices.length; i++) {
+      if (n < toRecoverBlockNum) {
+        if (recovery == 0) {
+          if (indices[i] >= dataBlkNum) {
+            toDead[n++] = i;
+          }
+        } else if (recovery == 1) {
+          if (indices[i] < dataBlkNum) {
+            toDead[n++] = i;
+          }
+        } else {
+          toDead[n++] = i;
+        }
+      } else {
+        break;
+      }
+    }
+    
+    DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum];
+    int[] deadDnIndices = new int[toRecoverBlockNum];
+    ExtendedBlock[] blocks = new ExtendedBlock[toRecoverBlockNum];
+    File[] replicas = new File[toRecoverBlockNum];
+    File[] metadatas = new File[toRecoverBlockNum];
+    byte[][] replicaContents = new byte[toRecoverBlockNum][];
+    for (int i = 0; i < toRecoverBlockNum; i++) {
+      dataDNs[i] = storageInfos[toDead[i]];
+      deadDnIndices[i] = dnMap.get(dataDNs[i]);
+      
+      // Check the block replica file on deadDn before it dead.
+      blocks[i] = StripedBlockUtil.constructInternalBlock(
+          lastBlock.getBlock(), cellSize, dataBlkNum, indices[toDead[i]]);
+      replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]);
+      metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]);
+      // the block replica on the datanode should be the same as expected
+      assertEquals(replicas[i].length(), 
+          StripedBlockUtil.getInternalBlockLength(
+          lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]]));
+      assertTrue(metadatas[i].getName().
+          endsWith(blocks[i].getGenerationStamp() + ".meta"));
+      replicaContents[i] = readReplica(replicas[i]);
+    }
+    
+    int cellsNum = (fileLen - 1) / cellSize + 1;
+    int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum;
+
+    try {
+      DatanodeID[] dnIDs = new DatanodeID[toRecoverBlockNum];
+      for (int i = 0; i < toRecoverBlockNum; i++) {
+        /*
+         * Kill the datanode which contains one replica
+         * We need to make sure it dead in namenode: clear its update time and 
+         * trigger NN to check heartbeat.
+         */
+        DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]);
+        dn.shutdown();
+        dnIDs[i] = dn.getDatanodeId();
+      }
+      setDataNodesDead(dnIDs);
+      
+      // Check the locatedBlocks of the file again
+      locatedBlocks = getLocatedBlocks(file);
+      lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
+      storageInfos = lastBlock.getLocations();
+      assertEquals(storageInfos.length, groupSize - toRecoverBlockNum);
+      
+      int[] targetDNs = new int[dnNum - groupSize];
+      n = 0;
+      for (int i = 0; i < dnNum; i++) {
+        if (!bitset.get(i)) { // not contain replica of the block.
+          targetDNs[n++] = i;
+        }
+      }
+      
+      waitForRecoveryFinished(file, groupSize);
+      
+      targetDNs = sortTargetsByReplicas(blocks, targetDNs);
+      
+      // Check the replica on the new target node.
+      for (int i = 0; i < toRecoverBlockNum; i++) {
+        File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], 
blocks[i]);
+        File metadataAfterRecovery = 
+            cluster.getBlockMetadataFile(targetDNs[i], blocks[i]);
+        assertEquals(replicaAfterRecovery.length(), replicas[i].length());
+        assertTrue(metadataAfterRecovery.getName().
+            endsWith(blocks[i].getGenerationStamp() + ".meta"));
+        byte[] replicaContentAfterRecovery = readReplica(replicaAfterRecovery);
+        
+        Assert.assertArrayEquals(replicaContents[i], 
replicaContentAfterRecovery);
+      }
+    } finally {
+      for (int i = 0; i < toRecoverBlockNum; i++) {
+        restartDataNode(toDead[i]);
+      }
+      cluster.waitActive();
+    }
+    fs.delete(file, true);
+  }
+  
+  private void setDataNodesDead(DatanodeID[] dnIDs) throws IOException {
+    for (DatanodeID dn : dnIDs) {
+      DatanodeDescriptor dnd =
+          NameNodeAdapter.getDatanode(cluster.getNamesystem(), dn);
+      DFSTestUtil.setDatanodeDead(dnd);
+    }
+    
+    
BlockManagerTestUtil.checkHeartbeat(cluster.getNamesystem().getBlockManager());
+  }
+  
+  private void restartDataNode(int dn) {
+    try {
+      cluster.restartDataNode(dn, true, true);
+    } catch (IOException e) {
+    }
+  }
+  
+  private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) 
{
+    int[] result = new int[blocks.length];
+    for (int i = 0; i < blocks.length; i++) {
+      result[i] = -1;
+      for (int j = 0; j < targetDNs.length; j++) {
+        if (targetDNs[j] != -1) {
+          File replica = cluster.getBlockFile(targetDNs[j], blocks[i]);
+          if (replica != null) {
+            result[i] = targetDNs[j];
+            targetDNs[j] = -1;
+            break;
+          }
+        }
+      }
+      if (result[i] == -1) {
+        Assert.fail("Failed to recover striped block: " + 
blocks[i].getBlockId());
+      }
+    }
+    return result;
+  }
+  
+  private byte[] readReplica(File replica) throws IOException {
+    int length = (int)replica.length();
+    ByteArrayOutputStream content = new ByteArrayOutputStream(length);
+    FileInputStream in = new FileInputStream(replica);
+    try {
+      byte[] buffer = new byte[1024];
+      int total = 0;
+      while (total < length) {
+        int n = in.read(buffer);
+        if (n <= 0) {
+          break;
+        }
+        content.write(buffer, 0, n);
+        total += n;
+      }
+      if (total < length) {
+        Assert.fail("Failed to read all content of replica");
+      }
+      return content.toByteArray();
+    } finally {
+      in.close();
+    }
+  }
+  
+  private LocatedBlocks waitForRecoveryFinished(Path file, int groupSize) 
+      throws Exception {
+    final int ATTEMPTS = 60;
+    for (int i = 0; i < ATTEMPTS; i++) {
+      LocatedBlocks locatedBlocks = getLocatedBlocks(file);
+      LocatedStripedBlock lastBlock = 
+          (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
+      DatanodeInfo[] storageInfos = lastBlock.getLocations();
+      if (storageInfos.length >= groupSize) {
+        return locatedBlocks;
+      }
+      Thread.sleep(1000);
+    }
+    throw new IOException ("Time out waiting for EC block recovery.");
+  }
+  
+  private LocatedBlocks getLocatedBlocks(Path file) throws IOException {
+    return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE);
+  }
+  
+  private void testCreateStripedFile(Path file, int dataLen)
+      throws IOException {
+    final byte[] data = new byte[dataLen];
+    ThreadLocalRandom.current().nextBytes(data);
+    writeContents(file, data);
+  }
+  
+  void writeContents(Path file, byte[] contents)
+      throws IOException {
+    FSDataOutputStream out = fs.create(file);
+    try {
+      out.write(contents, 0, contents.length);
+    } finally {
+      out.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
index 6cea7e8..6b4e46a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java
@@ -552,7 +552,7 @@ public class TestSafeMode {
       if(cluster!= null) cluster.shutdown();
     }
   }
-  
+
   void checkGetBlockLocationsWorks(FileSystem fs, Path fileName) throws 
IOException {
     FileStatus stat = fs.getFileStatus(fileName);
     try {  
@@ -560,7 +560,7 @@ public class TestSafeMode {
     } catch (SafeModeException e) {
       assertTrue("Should have not got safemode exception", false);
     } catch (RemoteException re) {
-      assertTrue("Should have not got safemode exception", false);   
+      assertTrue("Should have not got remote exception", false);
     }    
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeModeWithStripedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeModeWithStripedFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeModeWithStripedFile.java
new file mode 100644
index 0000000..6f0bc71
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeModeWithStripedFile.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestSafeModeWithStripedFile {
+
+  static final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS;
+  static final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS;
+  static final int numDNs = DATA_BLK_NUM + PARITY_BLK_NUM;
+  static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  static final int blockSize = cellSize * 2;
+
+  static MiniDFSCluster cluster;
+  static Configuration conf;
+
+  @Before
+  public void setup() throws IOException {
+    conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient().createErasureCodingZone("/",
+        null, cellSize);
+    cluster.waitActive();
+
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testStripedFile0() throws IOException {
+    doTest(cellSize, 1);
+  }
+
+  @Test
+  public void testStripedFile1() throws IOException {
+    doTest(cellSize * 5, 5);
+  }
+
+  /**
+   * This util writes a small block group whose size is given by caller.
+   * Then write another 2 full stripe blocks.
+   * Then shutdown all DNs and start again one by one. and verify the safemode
+   * status accordingly.
+   *
+   * @param smallSize file size of the small block group
+   * @param minStorages minimum replicas needed by the block so it can be safe
+   */
+  private void doTest(int smallSize, int minStorages) throws IOException {
+    FileSystem fs = cluster.getFileSystem();
+    // add 1 block
+    byte[] data = StripedFileTestUtil.generateBytes(smallSize);
+    Path smallFilePath = new Path("/testStripedFile_" + smallSize);
+    DFSTestUtil.writeFile(fs, smallFilePath, data);
+
+    // If we only have 1 block, NN won't enter safemode in the first place
+    // because the threshold is 0 blocks.
+    // So we need to add another 2 blocks.
+    int bigSize = blockSize * DATA_BLK_NUM * 2;
+    Path bigFilePath = new Path("/testStripedFile_" + bigSize);
+    data = StripedFileTestUtil.generateBytes(bigSize);
+    DFSTestUtil.writeFile(fs, bigFilePath, data);
+    // now we have 3 blocks. NN needs 2 blocks to reach the threshold 0.9 of
+    // total blocks 3.
+
+    // stopping all DNs
+    List<MiniDFSCluster.DataNodeProperties> dnprops = Lists.newArrayList();
+    LocatedBlocks lbs = cluster.getNameNodeRpc()
+        .getBlockLocations(smallFilePath.toString(), 0, smallSize);
+    DatanodeInfo[] locations = lbs.get(0).getLocations();
+    for (DatanodeInfo loc : locations) {
+      // keep the DNs that have smallFile in the head of dnprops
+      dnprops.add(cluster.stopDataNode(loc.getName()));
+    }
+    for (int i = 0; i < numDNs - locations.length; i++) {
+      dnprops.add(cluster.stopDataNode(0));
+    }
+
+    cluster.restartNameNode(0);
+    NameNode nn = cluster.getNameNode();
+    assertTrue(cluster.getNameNode().isInSafeMode());
+    assertEquals(0, NameNodeAdapter.getSafeModeSafeBlocks(nn));
+
+    // the block of smallFile doesn't reach minStorages,
+    // so the safe blocks count doesn't increment.
+    for (int i = 0; i < minStorages - 1; i++) {
+      cluster.restartDataNode(dnprops.remove(0));
+      cluster.triggerBlockReports();
+      assertEquals(0, NameNodeAdapter.getSafeModeSafeBlocks(nn));
+    }
+
+    // the block of smallFile reaches minStorages,
+    // so the safe blocks count increment.
+    cluster.restartDataNode(dnprops.remove(0));
+    cluster.triggerBlockReports();
+    assertEquals(1, NameNodeAdapter.getSafeModeSafeBlocks(nn));
+
+    // the 2 blocks of bigFile need DATA_BLK_NUM storages to be safe
+    for (int i = minStorages; i < DATA_BLK_NUM - 1; i++) {
+      cluster.restartDataNode(dnprops.remove(0));
+      cluster.triggerBlockReports();
+      assertTrue(nn.isInSafeMode());
+    }
+
+    cluster.restartDataNode(dnprops.remove(0));
+    cluster.triggerBlockReports();
+    assertFalse(nn.isInSafeMode());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
new file mode 100644
index 0000000..089a134
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.web.WebHdfsConstants;
+import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.stripesPerBlock;
+
+public class TestWriteReadStripedFile {
+  public static final Log LOG = 
LogFactory.getLog(TestWriteReadStripedFile.class);
+  private static MiniDFSCluster cluster;
+  private static FileSystem fs;
+  private static Configuration conf = new HdfsConfiguration();
+
+  static {
+    ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class))
+        .getLogger().setLevel(Level.ALL);
+  }
+
+  @Before
+  public void setup() throws IOException {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient().createErasureCodingZone("/",
+        null, cellSize);
+    fs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testFileEmpty() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/EmptyFile", 0);
+    testOneFileUsingDFSStripedInputStream("/EmptyFile2", 0, true);
+  }
+
+  @Test
+  public void testFileSmallerThanOneCell1() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1);
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell2", 1, true);
+  }
+
+  @Test
+  public void testFileSmallerThanOneCell2() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1);
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell2", cellSize - 1,
+        true);
+  }
+
+  @Test
+  public void testFileEqualsWithOneCell() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize);
+    testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell2", cellSize, 
true);
+  }
+
+  @Test
+  public void testFileSmallerThanOneStripe1() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
+        cellSize * dataBlocks - 1);
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe2",
+        cellSize * dataBlocks - 1, true);
+  }
+
+  @Test
+  public void testFileSmallerThanOneStripe2() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe",
+        cellSize + 123);
+    testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe2",
+        cellSize + 123, true);
+  }
+
+  @Test
+  public void testFileEqualsWithOneStripe() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe",
+        cellSize * dataBlocks);
+    testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe2",
+        cellSize * dataBlocks, true);
+  }
+
+  @Test
+  public void testFileMoreThanOneStripe1() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1",
+        cellSize * dataBlocks + 123);
+    testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe12",
+        cellSize * dataBlocks + 123, true);
+  }
+
+  @Test
+  public void testFileMoreThanOneStripe2() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2",
+        cellSize * dataBlocks + cellSize * dataBlocks + 123);
+    testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe22",
+        cellSize * dataBlocks + cellSize * dataBlocks + 123, true);
+  }
+
+  @Test
+  public void testLessThanFullBlockGroup() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup",
+        cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize);
+    testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup2",
+        cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize, true);
+  }
+
+  @Test
+  public void testFileFullBlockGroup() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/FullBlockGroup",
+        blockSize * dataBlocks);
+    testOneFileUsingDFSStripedInputStream("/FullBlockGroup2",
+        blockSize * dataBlocks, true);
+  }
+
+  @Test
+  public void testFileMoreThanABlockGroup1() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1",
+        blockSize * dataBlocks + 123);
+    testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup12",
+        blockSize * dataBlocks + 123, true);
+  }
+
+  @Test
+  public void testFileMoreThanABlockGroup2() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2",
+        blockSize * dataBlocks + cellSize + 123);
+    testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup22",
+        blockSize * dataBlocks + cellSize + 123, true);
+  }
+
+
+  @Test
+  public void testFileMoreThanABlockGroup3() throws IOException {
+    testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3",
+        blockSize * dataBlocks * 3 + cellSize * dataBlocks
+            + cellSize + 123);
+    testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup32",
+        blockSize * dataBlocks * 3 + cellSize * dataBlocks
+            + cellSize + 123, true);
+  }
+
+  private void testOneFileUsingDFSStripedInputStream(String src, int 
fileLength)
+      throws IOException {
+    testOneFileUsingDFSStripedInputStream(src, fileLength, false);
+  }
+
+  private void testOneFileUsingDFSStripedInputStream(String src, int 
fileLength,
+      boolean withDataNodeFailure) throws IOException {
+    final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
+    Path srcPath = new Path(src);
+    DFSTestUtil.writeFile(fs, srcPath, new String(expected));
+
+    StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
+
+    if (withDataNodeFailure) {
+      int dnIndex = 1; // TODO: StripedFileTestUtil.random.nextInt(dataBlocks);
+      LOG.info("stop DataNode " + dnIndex);
+      stopDataNode(srcPath, dnIndex);
+    }
+
+    byte[] smallBuf = new byte[1024];
+    byte[] largeBuf = new byte[fileLength + 100];
+    StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, 
largeBuf);
+
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+        largeBuf);
+    StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+        ByteBuffer.allocate(fileLength + 100));
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+        smallBuf);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+        ByteBuffer.allocate(1024));
+  }
+
+  private void stopDataNode(Path path, int failedDNIdx)
+      throws IOException {
+    BlockLocation[] locs = fs.getFileBlockLocations(path, 0, cellSize);
+    if (locs != null && locs.length > 0) {
+      String name = (locs[0].getNames())[failedDNIdx];
+      for (DataNode dn : cluster.getDataNodes()) {
+        int port = dn.getXferPort();
+        if (name.contains(Integer.toString(port))) {
+          dn.shutdown();
+          break;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testWriteReadUsingWebHdfs() throws Exception {
+    int fileLength = blockSize * dataBlocks + cellSize + 123;
+
+    final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
+    FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf,
+        WebHdfsConstants.WEBHDFS_SCHEME);
+    Path srcPath = new Path("/testWriteReadUsingWebHdfs");
+    DFSTestUtil.writeFile(fs, srcPath, new String(expected));
+
+    StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
+
+    byte[] smallBuf = new byte[1024];
+    byte[] largeBuf = new byte[fileLength + 100];
+    // TODO: HDFS-8797
+    //StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, 
largeBuf);
+
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, 
largeBuf);
+    StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, 
smallBuf);
+    // webhdfs doesn't support bytebuffer read
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
new file mode 100644
index 0000000..3679c5f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks;
+
+public class TestWriteStripedFileWithFailure {
+  public static final Log LOG = LogFactory
+      .getLog(TestWriteStripedFileWithFailure.class);
+  private static MiniDFSCluster cluster;
+  private static FileSystem fs;
+  private static Configuration conf = new HdfsConfiguration();
+  private final int smallFileLength = blockSize * dataBlocks - 123;
+  private final int largeFileLength = blockSize * dataBlocks + 123;
+  private final int[] fileLengths = {smallFileLength, largeFileLength};
+
+  public void setup() throws IOException {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+    cluster.getFileSystem().getClient().createErasureCodingZone("/",
+        null, cellSize);
+    fs = cluster.getFileSystem();
+  }
+
+  public void tearDown() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  // Test writing file with some Datanodes failure
+  @Test(timeout = 300000)
+  public void testWriteStripedFileWithDNFailure() throws IOException {
+    for (int fileLength : fileLengths) {
+      for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) {
+        for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; 
parityDelNum++) {
+          try {
+            // setup a new cluster with no dead datanode
+            setup();
+            writeFileWithDNFailure(fileLength, dataDelNum, parityDelNum);
+          } catch (IOException ioe) {
+            String fileType = fileLength < (blockSize * dataBlocks) ?
+                "smallFile" : "largeFile";
+            LOG.error("Failed to write file with DN failure:"
+                + " fileType = "+ fileType
+                + ", dataDelNum = " + dataDelNum
+                + ", parityDelNum = " + parityDelNum);
+            throw ioe;
+          } finally {
+            // tear down the cluster
+            tearDown();
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Test writing a file with shutting down some DNs(data DNs or parity DNs or 
both).
+   * @param fileLength file length
+   * @param dataDNFailureNum the shutdown number of data DNs
+   * @param parityDNFailureNum the shutdown number of parity DNs
+   * @throws IOException
+   */
+  private void writeFileWithDNFailure(int fileLength,
+      int dataDNFailureNum, int parityDNFailureNum) throws IOException {
+    String fileType = fileLength < (blockSize * dataBlocks) ?
+        "smallFile" : "largeFile";
+    String src = "/dnFailure_" + dataDNFailureNum + "_" + parityDNFailureNum
+        + "_" + fileType;
+    LOG.info("writeFileWithDNFailure: file = " + src
+        + ", fileType = " + fileType
+        + ", dataDNFailureNum = " + dataDNFailureNum
+        + ", parityDNFailureNum = " + parityDNFailureNum);
+
+    Path srcPath = new Path(src);
+    final AtomicInteger pos = new AtomicInteger();
+    final FSDataOutputStream out = fs.create(srcPath);
+    final DFSStripedOutputStream stripedOut
+        = (DFSStripedOutputStream)out.getWrappedStream();
+
+    int[] dataDNFailureIndices = StripedFileTestUtil.randomArray(0, dataBlocks,
+        dataDNFailureNum);
+    Assert.assertNotNull(dataDNFailureIndices);
+    int[] parityDNFailureIndices = StripedFileTestUtil.randomArray(dataBlocks,
+        dataBlocks + parityBlocks, parityDNFailureNum);
+    Assert.assertNotNull(parityDNFailureIndices);
+
+    int[] failedDataNodes = new int[dataDNFailureNum + parityDNFailureNum];
+    System.arraycopy(dataDNFailureIndices, 0, failedDataNodes,
+        0, dataDNFailureIndices.length);
+    System.arraycopy(parityDNFailureIndices, 0, failedDataNodes,
+        dataDNFailureIndices.length, parityDNFailureIndices.length);
+
+    final int killPos = fileLength/2;
+    for (; pos.get() < fileLength; ) {
+      final int i = pos.getAndIncrement();
+      if (i == killPos) {
+        for(int failedDn : failedDataNodes) {
+          StripedFileTestUtil.killDatanode(cluster, stripedOut, failedDn, pos);
+        }
+      }
+      write(out, i);
+    }
+    out.close();
+
+    // make sure the expected number of Datanode have been killed
+    int dnFailureNum = dataDNFailureNum + parityDNFailureNum;
+    Assert.assertEquals(cluster.getDataNodes().size(), numDNs - dnFailureNum);
+
+    byte[] smallBuf = new byte[1024];
+    byte[] largeBuf = new byte[fileLength + 100];
+    final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
+    StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
+    StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
+        smallBuf);
+    StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, 
largeBuf);
+
+    // delete the file
+    fs.delete(srcPath, true);
+  }
+
+  void write(FSDataOutputStream out, int i) throws IOException {
+    try {
+      out.write(StripedFileTestUtil.getByte(i));
+    } catch (IOException e) {
+      throw new IOException("Failed at i=" + i, e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestLayoutVersion.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestLayoutVersion.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestLayoutVersion.java
index 9f8aef5..e944b81 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestLayoutVersion.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestLayoutVersion.java
@@ -126,7 +126,8 @@ public class TestLayoutVersion {
     EnumSet<NameNodeLayoutVersion.Feature> compatibleFeatures = EnumSet.of(
         NameNodeLayoutVersion.Feature.TRUNCATE,
         NameNodeLayoutVersion.Feature.APPEND_NEW_BLOCK,
-        NameNodeLayoutVersion.Feature.QUOTA_BY_STORAGE_TYPE);
+        NameNodeLayoutVersion.Feature.QUOTA_BY_STORAGE_TYPE,
+        NameNodeLayoutVersion.Feature.ERASURE_CODING);
     for (LayoutFeature f : compatibleFeatures) {
       assertEquals(String.format("Expected minimum compatible layout version " 
+
           "%d for feature %s.", baseLV, f), baseLV,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
index c7233bd..3675e63 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.fs.permission.AclEntry;
@@ -40,6 +41,7 @@ import 
org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
@@ -63,15 +65,21 @@ import 
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import 
org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
+import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -80,6 +88,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.erasurecode.ECSchema;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
@@ -181,40 +190,58 @@ public class TestPBHelper {
     assertEquals(b, b2);
   }
 
-  private static BlockWithLocations getBlockWithLocations(int bid) {
+  private static BlockWithLocations getBlockWithLocations(
+      int bid, boolean isStriped) {
     final String[] datanodeUuids = {"dn1", "dn2", "dn3"};
     final String[] storageIDs = {"s1", "s2", "s3"};
     final StorageType[] storageTypes = {
         StorageType.DISK, StorageType.DISK, StorageType.DISK};
-    return new BlockWithLocations(new Block(bid, 0, 1),
+    final byte[] indices = {0, 1, 2};
+    final short dataBlkNum = 6;
+    BlockWithLocations blkLocs = new BlockWithLocations(new Block(bid, 0, 1),
         datanodeUuids, storageIDs, storageTypes);
+    if (isStriped) {
+      blkLocs = new StripedBlockWithLocations(blkLocs, indices, dataBlkNum);
+    }
+    return blkLocs;
   }
 
   private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {
     assertEquals(locs1.getBlock(), locs2.getBlock());
     assertTrue(Arrays.equals(locs1.getStorageIDs(), locs2.getStorageIDs()));
+    if (locs1 instanceof StripedBlockWithLocations) {
+      assertTrue(Arrays.equals(((StripedBlockWithLocations) 
locs1).getIndices(),
+          ((StripedBlockWithLocations) locs2).getIndices()));
+    }
   }
 
   @Test
   public void testConvertBlockWithLocations() {
-    BlockWithLocations locs = getBlockWithLocations(1);
-    BlockWithLocationsProto locsProto = PBHelper.convert(locs);
-    BlockWithLocations locs2 = PBHelper.convert(locsProto);
-    compare(locs, locs2);
+    boolean[] testSuite = new boolean[]{false, true};
+    for (int i = 0; i < testSuite.length; i++) {
+      BlockWithLocations locs = getBlockWithLocations(1, testSuite[i]);
+      BlockWithLocationsProto locsProto = PBHelper.convert(locs);
+      BlockWithLocations locs2 = PBHelper.convert(locsProto);
+      compare(locs, locs2);
+    }
   }
 
   @Test
   public void testConvertBlocksWithLocations() {
-    BlockWithLocations[] list = new BlockWithLocations[] {
-        getBlockWithLocations(1), getBlockWithLocations(2) };
-    BlocksWithLocations locs = new BlocksWithLocations(list);
-    BlocksWithLocationsProto locsProto = PBHelper.convert(locs);
-    BlocksWithLocations locs2 = PBHelper.convert(locsProto);
-    BlockWithLocations[] blocks = locs.getBlocks();
-    BlockWithLocations[] blocks2 = locs2.getBlocks();
-    assertEquals(blocks.length, blocks2.length);
-    for (int i = 0; i < blocks.length; i++) {
-      compare(blocks[i], blocks2[i]);
+    boolean[] testSuite = new boolean[]{false, true};
+    for (int i = 0; i < testSuite.length; i++) {
+      BlockWithLocations[] list = new BlockWithLocations[]{
+          getBlockWithLocations(1, testSuite[i]),
+          getBlockWithLocations(2, testSuite[i])};
+      BlocksWithLocations locs = new BlocksWithLocations(list);
+      BlocksWithLocationsProto locsProto = PBHelper.convert(locs);
+      BlocksWithLocations locs2 = PBHelper.convert(locsProto);
+      BlockWithLocations[] blocks = locs.getBlocks();
+      BlockWithLocations[] blocks2 = locs2.getBlocks();
+      assertEquals(blocks.length, blocks2.length);
+      for (int j = 0; j < blocks.length; j++) {
+        compare(blocks[j], blocks2[j]);
+      }
     }
   }
 
@@ -489,16 +516,16 @@ public class TestPBHelper {
   @Test
   public void testConvertLocatedBlock() {
     LocatedBlock lb = createLocatedBlock();
-    LocatedBlockProto lbProto = PBHelper.convert(lb);
-    LocatedBlock lb2 = PBHelper.convert(lbProto);
+    LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb);
+    LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto);
     compare(lb,lb2);
   }
 
   @Test
   public void testConvertLocatedBlockNoStorageMedia() {
     LocatedBlock lb = createLocatedBlockNoStorageMedia();
-    LocatedBlockProto lbProto = PBHelper.convert(lb);
-    LocatedBlock lb2 = PBHelper.convert(lbProto);
+    LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb);
+    LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto);
     compare(lb,lb2);
   }
 
@@ -508,8 +535,8 @@ public class TestPBHelper {
     for (int i=0;i<3;i++) {
       lbl.add(createLocatedBlock());
     }
-    List<LocatedBlockProto> lbpl = PBHelper.convertLocatedBlock2(lbl);
-    List<LocatedBlock> lbl2 = PBHelper.convertLocatedBlock(lbpl);
+    List<LocatedBlockProto> lbpl = PBHelper.convertLocatedBlocks2(lbl);
+    List<LocatedBlock> lbl2 = PBHelper.convertLocatedBlocks(lbpl);
     assertEquals(lbl.size(), lbl2.size());
     for (int i=0;i<lbl.size();i++) {
       compare(lbl.get(i), lbl2.get(2));
@@ -522,8 +549,8 @@ public class TestPBHelper {
     for (int i=0;i<3;i++) {
       lbl[i] = createLocatedBlock();
     }
-    LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlock(lbl);
-    LocatedBlock [] lbl2 = PBHelper.convertLocatedBlock(lbpl);
+    LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlocks(lbl);
+    LocatedBlock [] lbl2 = PBHelper.convertLocatedBlocks(lbpl);
     assertEquals(lbl.length, lbl2.length);
     for (int i=0;i<lbl.length;i++) {
       compare(lbl[i], lbl2[i]);
@@ -639,4 +666,99 @@ public class TestPBHelper {
         .build();
     Assert.assertEquals(s, PBHelper.convert(PBHelper.convert(s)));
   }
+  
+  @Test
+  public void testBlockECRecoveryCommand() {
+    DatanodeInfo[] dnInfos0 = new DatanodeInfo[] {
+        DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() 
};
+    DatanodeStorageInfo targetDnInfos_0 = BlockManagerTestUtil
+        .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
+            new DatanodeStorage("s00"));
+    DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil
+        .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
+            new DatanodeStorage("s01"));
+    DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] {
+        targetDnInfos_0, targetDnInfos_1 };
+    short[] liveBlkIndices0 = new short[2];
+    BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo(
+        new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0,
+        liveBlkIndices0, ErasureCodingSchemaManager.getSystemDefaultSchema(),
+        64 * 1024);
+    DatanodeInfo[] dnInfos1 = new DatanodeInfo[] {
+        DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() 
};
+    DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil
+        .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
+            new DatanodeStorage("s02"));
+    DatanodeStorageInfo targetDnInfos_3 = BlockManagerTestUtil
+        .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(),
+            new DatanodeStorage("s03"));
+    DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] {
+        targetDnInfos_2, targetDnInfos_3 };
+    short[] liveBlkIndices1 = new short[2];
+    BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo(
+        new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1,
+        liveBlkIndices1, ErasureCodingSchemaManager.getSystemDefaultSchema(),
+        64 * 1024);
+    List<BlockECRecoveryInfo> blkRecoveryInfosList = new 
ArrayList<BlockECRecoveryInfo>();
+    blkRecoveryInfosList.add(blkECRecoveryInfo0);
+    blkRecoveryInfosList.add(blkECRecoveryInfo1);
+    BlockECRecoveryCommand blkECRecoveryCmd = new BlockECRecoveryCommand(
+        DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, blkRecoveryInfosList);
+    BlockECRecoveryCommandProto blkECRecoveryCmdProto = PBHelper
+        .convert(blkECRecoveryCmd);
+    blkECRecoveryCmd = PBHelper.convert(blkECRecoveryCmdProto);
+    Iterator<BlockECRecoveryInfo> iterator = blkECRecoveryCmd.getECTasks()
+        .iterator();
+    assertBlockECRecoveryInfoEquals(blkECRecoveryInfo0, iterator.next());
+    assertBlockECRecoveryInfoEquals(blkECRecoveryInfo1, iterator.next());
+  }
+
+  private void assertBlockECRecoveryInfoEquals(
+      BlockECRecoveryInfo blkECRecoveryInfo1,
+      BlockECRecoveryInfo blkECRecoveryInfo2) {
+    assertEquals(blkECRecoveryInfo1.getExtendedBlock(),
+        blkECRecoveryInfo2.getExtendedBlock());
+
+    DatanodeInfo[] sourceDnInfos1 = blkECRecoveryInfo1.getSourceDnInfos();
+    DatanodeInfo[] sourceDnInfos2 = blkECRecoveryInfo2.getSourceDnInfos();
+    assertDnInfosEqual(sourceDnInfos1, sourceDnInfos2);
+
+    DatanodeInfo[] targetDnInfos1 = blkECRecoveryInfo1.getTargetDnInfos();
+    DatanodeInfo[] targetDnInfos2 = blkECRecoveryInfo2.getTargetDnInfos();
+    assertDnInfosEqual(targetDnInfos1, targetDnInfos2);
+
+    String[] targetStorageIDs1 = blkECRecoveryInfo1.getTargetStorageIDs();
+    String[] targetStorageIDs2 = blkECRecoveryInfo2.getTargetStorageIDs();
+    assertEquals(targetStorageIDs1.length, targetStorageIDs2.length);
+    for (int i = 0; i < targetStorageIDs1.length; i++) {
+      assertEquals(targetStorageIDs1[i], targetStorageIDs2[i]);
+    }
+
+    short[] liveBlockIndices1 = blkECRecoveryInfo1.getLiveBlockIndices();
+    short[] liveBlockIndices2 = blkECRecoveryInfo2.getLiveBlockIndices();
+    for (int i = 0; i < liveBlockIndices1.length; i++) {
+      assertEquals(liveBlockIndices1[i], liveBlockIndices2[i]);
+    }
+    
+    ECSchema ecSchema1 = blkECRecoveryInfo1.getECSchema();
+    ECSchema ecSchema2 = blkECRecoveryInfo2.getECSchema();
+    // Compare ECSchemas same as default ECSchema as we used system default
+    // ECSchema used in this test
+    compareECSchemas(ErasureCodingSchemaManager.getSystemDefaultSchema(), 
ecSchema1);
+    compareECSchemas(ErasureCodingSchemaManager.getSystemDefaultSchema(), 
ecSchema2);
+  }
+
+  private void compareECSchemas(ECSchema ecSchema1, ECSchema ecSchema2) {
+    assertEquals(ecSchema1.getSchemaName(), ecSchema2.getSchemaName());
+    assertEquals(ecSchema1.getNumDataUnits(), ecSchema2.getNumDataUnits());
+    assertEquals(ecSchema1.getNumParityUnits(), ecSchema2.getNumParityUnits());
+  }
+
+  private void assertDnInfosEqual(DatanodeInfo[] dnInfos1,
+      DatanodeInfo[] dnInfos2) {
+    assertEquals(dnInfos1.length, dnInfos2.length);
+    for (int i = 0; i < dnInfos1.length; i++) {
+      compare(dnInfos1[i], dnInfos2[i]);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 194aa0f..3c149d0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
@@ -80,6 +81,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.Tool;
@@ -144,6 +146,22 @@ public class TestBalancer {
     LazyPersistTestCase.initCacheManipulator();
   }
 
+  int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
+  int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
+  int groupSize = dataBlocks + parityBlocks;
+  private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final static int stripesPerBlock = 4;
+  static int DEFAULT_STRIPE_BLOCK_SIZE = cellSize * stripesPerBlock;
+
+  static void initConfWithStripe(Configuration conf) {
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, 
false);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    SimulatedFSDataset.setFactory(conf);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
+  }
+
   /* create a file with a length of <code>fileLen</code> */
   static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen,
       short replicationFactor, int nnIndex)
@@ -1529,6 +1547,75 @@ public class TestBalancer {
     }
   }
 
+  public void integrationTestWithStripedFile(Configuration conf) throws 
Exception {
+    initConfWithStripe(conf);
+    doTestBalancerWithStripedFile(conf);
+  }
+
+  @Test(timeout = 100000)
+  public void testBalancerWithStripedFile() throws Exception {
+    Configuration conf = new Configuration();
+    initConfWithStripe(conf);
+    doTestBalancerWithStripedFile(conf);
+  }
+
+  private void doTestBalancerWithStripedFile(Configuration conf) throws 
Exception {
+    int numOfDatanodes = dataBlocks + parityBlocks + 2;
+    int numOfRacks = dataBlocks;
+    long capacity = 20 * DEFAULT_STRIPE_BLOCK_SIZE;
+    long[] capacities = new long[numOfDatanodes];
+    for (int i = 0; i < capacities.length; i++) {
+      capacities[i] = capacity;
+    }
+    String[] racks = new String[numOfDatanodes];
+    for (int i = 0; i < numOfDatanodes; i++) {
+      racks[i] = "/rack" + (i % numOfRacks);
+    }
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(numOfDatanodes)
+        .racks(racks)
+        .simulatedCapacities(capacities)
+        .build();
+
+    try {
+      cluster.waitActive();
+      client = NameNodeProxies.createProxy(conf, 
cluster.getFileSystem(0).getUri(),
+          ClientProtocol.class).getProxy();
+      client.createErasureCodingZone("/", null, 0);
+
+      long totalCapacity = sum(capacities);
+
+      // fill up the cluster with 30% data. It'll be 45% full plus parity.
+      long fileLen = totalCapacity * 3 / 10;
+      long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks;
+      FileSystem fs = cluster.getFileSystem(0);
+      DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong());
+
+      // verify locations of striped blocks
+      LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, 
fileLen);
+      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+
+      // add one datanode
+      String newRack = "/rack" + (++numOfRacks);
+      cluster.startDataNodes(conf, 1, true, null,
+          new String[]{newRack}, null, new long[]{capacity});
+      totalCapacity += capacity;
+      cluster.triggerHeartbeats();
+
+      // run balancer and validate results
+      Balancer.Parameters p = Balancer.Parameters.DEFAULT;
+      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+      runBalancer(conf, totalUsedSpace, totalCapacity, p, 0);
+
+      // verify locations of striped blocks
+      locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
+      DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   /**
    * @param args
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index 148135b..64d80bd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -161,7 +161,7 @@ public class BlockManagerTestUtil {
    */
   public static int computeAllPendingWork(BlockManager bm) {
     int work = computeInvalidationWork(bm);
-    work += bm.computeReplicationWork(Integer.MAX_VALUE);
+    work += bm.computeBlockRecoveryWork(Integer.MAX_VALUE);
     return work;
   }
 
@@ -306,4 +306,12 @@ public class BlockManagerTestUtil {
       throws ExecutionException, InterruptedException {
     dm.getDecomManager().runMonitor();
   }
+
+  /**
+   * add block to the replicateBlocks queue of the Datanode
+   */
+  public static void addBlockToBeReplicated(DatanodeDescriptor node,
+      Block block, DatanodeStorageInfo[] targets) {
+    node.addBlockToBeReplicated(block, targets);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
index 5126aa7..bae4f1d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
@@ -63,7 +63,7 @@ public class TestBlockInfo {
 
     final DatanodeStorageInfo storage = 
DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1");
 
-    boolean added = blockInfo.addStorage(storage);
+    boolean added = blockInfo.addStorage(storage, blockInfo);
 
     Assert.assertTrue(added);
     Assert.assertEquals(storage, blockInfo.getStorageInfo(0));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java
new file mode 100644
index 0000000..6788770
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult;
+import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager;
+import org.apache.hadoop.io.erasurecode.ECSchema;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Test {@link BlockInfoStriped}
+ */
+public class TestBlockInfoStriped {
+  private static final int TOTAL_NUM_BLOCKS = NUM_DATA_BLOCKS + 
NUM_PARITY_BLOCKS;
+  private static final long BASE_ID = -1600;
+  private static final Block baseBlock = new Block(BASE_ID);
+  private static final ECSchema testSchema
+      = ErasureCodingSchemaManager.getSystemDefaultSchema();
+  private static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE;
+  private final BlockInfoStriped info = new BlockInfoStriped(baseBlock,
+      testSchema, cellSize);
+
+  private Block[] createReportedBlocks(int num) {
+    Block[] blocks = new Block[num];
+    for (int i = 0; i < num; i++) {
+      blocks[i] = new Block(BASE_ID + i);
+    }
+    return blocks;
+  }
+
+  /**
+   * Test adding storage and reported block
+   */
+  @Test
+  public void testAddStorage() {
+    // first add NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS storages, i.e., a complete
+    // group of blocks/storages
+    DatanodeStorageInfo[] storageInfos = 
DFSTestUtil.createDatanodeStorageInfos(
+        TOTAL_NUM_BLOCKS);
+    Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS);
+    int i = 0;
+    for (; i < storageInfos.length; i += 2) {
+      info.addStorage(storageInfos[i], blocks[i]);
+      Assert.assertEquals(i/2 + 1, info.numNodes());
+    }
+    i /= 2;
+    for (int j = 1; j < storageInfos.length; j += 2) {
+      Assert.assertTrue(info.addStorage(storageInfos[j], blocks[j]));
+      Assert.assertEquals(i + (j+1)/2, info.numNodes());
+    }
+
+    // check
+    byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices");
+    Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity());
+    Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length);
+    i = 0;
+    for (DatanodeStorageInfo storage : storageInfos) {
+      int index = info.findStorageInfo(storage);
+      Assert.assertEquals(i++, index);
+      Assert.assertEquals(index, indices[index]);
+    }
+
+    // the same block is reported from the same storage twice
+    i = 0;
+    for (DatanodeStorageInfo storage : storageInfos) {
+      Assert.assertTrue(info.addStorage(storage, blocks[i++]));
+    }
+    Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity());
+    Assert.assertEquals(TOTAL_NUM_BLOCKS, info.numNodes());
+    Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length);
+    i = 0;
+    for (DatanodeStorageInfo storage : storageInfos) {
+      int index = info.findStorageInfo(storage);
+      Assert.assertEquals(i++, index);
+      Assert.assertEquals(index, indices[index]);
+    }
+
+    // the same block is reported from another storage
+    DatanodeStorageInfo[] storageInfos2 = 
DFSTestUtil.createDatanodeStorageInfos(
+        TOTAL_NUM_BLOCKS * 2);
+    // only add the second half of info2
+    for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) {
+      info.addStorage(storageInfos2[i], blocks[i % TOTAL_NUM_BLOCKS]);
+      Assert.assertEquals(i + 1, info.getCapacity());
+      Assert.assertEquals(i + 1, info.numNodes());
+      indices = (byte[]) Whitebox.getInternalState(info, "indices");
+      Assert.assertEquals(i + 1, indices.length);
+    }
+    for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) {
+      int index = info.findStorageInfo(storageInfos2[i]);
+      Assert.assertEquals(i++, index);
+      Assert.assertEquals(index - TOTAL_NUM_BLOCKS, indices[index]);
+    }
+  }
+
+  @Test
+  public void testRemoveStorage() {
+    // first add TOTAL_NUM_BLOCKS into the BlockInfoStriped
+    DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(
+        TOTAL_NUM_BLOCKS);
+    Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS);
+    for (int i = 0; i < storages.length; i++) {
+      info.addStorage(storages[i], blocks[i]);
+    }
+
+    // remove two storages
+    info.removeStorage(storages[0]);
+    info.removeStorage(storages[2]);
+
+    // check
+    Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity());
+    Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes());
+    byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices");
+    for (int i = 0; i < storages.length; i++) {
+      int index = info.findStorageInfo(storages[i]);
+      if (i != 0 && i != 2) {
+        Assert.assertEquals(i, index);
+        Assert.assertEquals(index, indices[index]);
+      } else {
+        Assert.assertEquals(-1, index);
+        Assert.assertEquals(-1, indices[i]);
+      }
+    }
+
+    // the same block is reported from another storage
+    DatanodeStorageInfo[] storages2 = DFSTestUtil.createDatanodeStorageInfos(
+        TOTAL_NUM_BLOCKS * 2);
+    for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) {
+      info.addStorage(storages2[i], blocks[i % TOTAL_NUM_BLOCKS]);
+    }
+    // now we should have 8 storages
+    Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.numNodes());
+    Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity());
+    indices = (byte[]) Whitebox.getInternalState(info, "indices");
+    Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length);
+    int j = TOTAL_NUM_BLOCKS;
+    for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) {
+      int index = info.findStorageInfo(storages2[i]);
+      if (i == TOTAL_NUM_BLOCKS || i == TOTAL_NUM_BLOCKS + 2) {
+        Assert.assertEquals(i - TOTAL_NUM_BLOCKS, index);
+      } else {
+        Assert.assertEquals(j++, index);
+      }
+    }
+
+    // remove the storages from storages2
+    for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) {
+      info.removeStorage(storages2[i + TOTAL_NUM_BLOCKS]);
+    }
+    // now we should have 3 storages
+    Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes());
+    Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity());
+    indices = (byte[]) Whitebox.getInternalState(info, "indices");
+    Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length);
+    for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) {
+      if (i == 0 || i == 2) {
+        int index = info.findStorageInfo(storages2[i + TOTAL_NUM_BLOCKS]);
+        Assert.assertEquals(-1, index);
+      } else {
+        int index = info.findStorageInfo(storages[i]);
+        Assert.assertEquals(i, index);
+      }
+    }
+    for (int i = TOTAL_NUM_BLOCKS; i < TOTAL_NUM_BLOCKS * 2 - 2; i++) {
+      Assert.assertEquals(-1, indices[i]);
+      Assert.assertNull(info.getDatanode(i));
+    }
+  }
+
+  @Test
+  public void testReplaceBlock() {
+    DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(
+        TOTAL_NUM_BLOCKS);
+    Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS);
+    // add block/storage 0, 2, 4 into the BlockInfoStriped
+    for (int i = 0; i < storages.length; i += 2) {
+      Assert.assertEquals(AddBlockResult.ADDED,
+          storages[i].addBlock(info, blocks[i]));
+    }
+
+    BlockInfoStriped newBlockInfo = new BlockInfoStriped(info);
+    info.replaceBlock(newBlockInfo);
+
+    // make sure the newBlockInfo is correct
+    byte[] indices = (byte[]) Whitebox.getInternalState(newBlockInfo, 
"indices");
+    for (int i = 0; i < storages.length; i += 2) {
+      int index = newBlockInfo.findStorageInfo(storages[i]);
+      Assert.assertEquals(i, index);
+      Assert.assertEquals(index, indices[i]);
+
+      // make sure the newBlockInfo is added to the linked list of the storage
+      Assert.assertSame(newBlockInfo, 
storages[i].getBlockListHeadForTesting());
+      Assert.assertEquals(1, storages[i].numBlocks());
+      Assert.assertNull(newBlockInfo.getNext());
+    }
+  }
+
+  @Test
+  public void testWrite() {
+    long blkID = 1;
+    long numBytes = 1;
+    long generationStamp = 1;
+    ByteBuffer byteBuffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE * 3);
+    byteBuffer.putLong(blkID).putLong(numBytes).putLong(generationStamp);
+
+    ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(byteStream);
+    BlockInfoStriped blk = new BlockInfoStriped(new Block(blkID, numBytes,
+        generationStamp), testSchema, cellSize);
+
+    try {
+      blk.write(out);
+    } catch(Exception ex) {
+      fail("testWrite error:" + ex.getMessage());
+    }
+    assertEquals(byteBuffer.array().length, byteStream.toByteArray().length);
+    assertArrayEquals(byteBuffer.array(), byteStream.toByteArray());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java
index a7ba293..a447aaf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
-import org.apache.hadoop.util.Time;
 import org.junit.Test;
 
 /**
@@ -51,7 +50,7 @@ public class TestBlockInfoUnderConstruction {
     DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
     DFSTestUtil.resetLastUpdatesWithOffset(dd3, -2 * 1000);
     blockInfo.initializeBlockRecovery(1);
-    BlockInfoContiguousUnderConstruction[] blockInfoRecovery = 
dd2.getLeaseRecoveryCommand(1);
+    BlockInfoUnderConstruction[] blockInfoRecovery = 
dd2.getLeaseRecoveryCommand(1);
     assertEquals(blockInfoRecovery[0], blockInfo);
 
     // Recovery attempt #2.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 2d3d90a..66a4681 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -383,7 +383,7 @@ public class TestBlockManager {
     for (int i = 1; i < pipeline.length; i++) {
       DatanodeStorageInfo storage = pipeline[i];
       bm.addBlock(storage, blockInfo, null);
-      blockInfo.addStorage(storage);
+      blockInfo.addStorage(storage, blockInfo);
     }
   }
 
@@ -393,7 +393,7 @@ public class TestBlockManager {
 
     for (DatanodeDescriptor dn : nodes) {
       for (DatanodeStorageInfo storage : dn.getStorageInfos()) {
-        blockInfo.addStorage(storage);
+        blockInfo.addStorage(storage, blockInfo);
       }
     }
     return blockInfo;
@@ -453,8 +453,8 @@ public class TestBlockManager {
     assertEquals("Block not initially pending replication", 0,
         bm.pendingReplications.getNumReplicas(block));
     assertEquals(
-        "computeReplicationWork should indicate replication is needed", 1,
-        bm.computeReplicationWorkForBlocks(list_all));
+        "computeBlockRecoveryWork should indicate replication is needed", 1,
+        bm.computeRecoveryWorkForBlocks(list_all));
     assertTrue("replication is pending after work is computed",
         bm.pendingReplications.getNumReplicas(block) > 0);
 
@@ -508,35 +508,38 @@ public class TestBlockManager {
     assertNotNull("Chooses source node for a highest-priority replication"
         + " even if all available source nodes have reached their replication"
         + " limits below the hard limit.",
-        bm.chooseSourceDatanode(
-            aBlock,
+        bm.chooseSourceDatanodes(
+            bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
             new NumberReplicas(),
-            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
-
-    assertNull("Does not choose a source node for a less-than-highest-priority"
-        + " replication since all available source nodes have reached"
-        + " their replication limits.",
-        bm.chooseSourceDatanode(
-            aBlock,
+            new ArrayList<Short>(),
+            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
+
+    assertEquals("Does not choose a source node for a 
less-than-highest-priority"
+            + " replication since all available source nodes have reached"
+            + " their replication limits.", 0,
+        bm.chooseSourceDatanodes(
+            bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
             new NumberReplicas(),
-            UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED));
+            new ArrayList<Short>(),
+            UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).length);
 
     // Increase the replication count to test replication count > hard limit
     DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
     origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
 
-    assertNull("Does not choose a source node for a highest-priority"
-        + " replication when all available nodes exceed the hard limit.",
-        bm.chooseSourceDatanode(
-            aBlock,
+    assertEquals("Does not choose a source node for a highest-priority"
+            + " replication when all available nodes exceed the hard limit.", 
0,
+        bm.chooseSourceDatanodes(
+            bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
             new NumberReplicas(),
-            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
+            new ArrayList<Short>(),
+            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).length);
   }
 
   @Test
@@ -557,30 +560,28 @@ public class TestBlockManager {
     assertNotNull("Chooses decommissioning source node for a normal 
replication"
         + " if all available source nodes have reached their replication"
         + " limits below the hard limit.",
-        bm.chooseSourceDatanode(
-            aBlock,
+        bm.chooseSourceDatanodes(
+            bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
-            new NumberReplicas(),
-            UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED));
+            new NumberReplicas(), new LinkedList<Short>(),
+            UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)[0]);
 
 
     // Increase the replication count to test replication count > hard limit
     DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
     origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
 
-    assertNull("Does not choose a source decommissioning node for a normal"
-        + " replication when all available nodes exceed the hard limit.",
-        bm.chooseSourceDatanode(
-            aBlock,
+    assertEquals("Does not choose a source decommissioning node for a normal"
+        + " replication when all available nodes exceed the hard limit.", 0,
+        bm.chooseSourceDatanodes(
+            bm.getStoredBlock(aBlock),
             cntNodes,
             liveNodes,
-            new NumberReplicas(),
-            UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED));
+            new NumberReplicas(), new LinkedList<Short>(),
+            UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).length);
   }
 
-
-
   @Test
   public void testSafeModeIBR() throws Exception {
     DatanodeDescriptor node = spy(nodes.get(0));

Reply via email to