http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java new file mode 100644 index 0000000..9285fd7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRecoverStripedFile.java @@ -0,0 +1,400 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +public class TestRecoverStripedFile { + public static final Log LOG = LogFactory.getLog(TestRecoverStripedFile.class); + + private static final int dataBlkNum = HdfsConstants.NUM_DATA_BLOCKS; + private static final int parityBlkNum = HdfsConstants.NUM_PARITY_BLOCKS; + private static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private static final int blockSize = cellSize * 3; + private static final int groupSize = dataBlkNum + parityBlkNum; + private static final int dnNum = groupSize + parityBlkNum; + + private MiniDFSCluster cluster; + private Configuration conf; + private DistributedFileSystem fs; + // Map: DatanodeID -> datanode index in cluster + private Map<DatanodeID, Integer> dnMap = new HashMap<DatanodeID, Integer>(); + + @Before + public void setup() throws IOException { + conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, cellSize - 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build();; + cluster.waitActive(); + + fs = cluster.getFileSystem(); + fs.getClient().createErasureCodingZone("/", null, 0); + + List<DataNode> datanodes = cluster.getDataNodes(); + for (int i = 0; i < dnNum; i++) { + dnMap.put(datanodes.get(i).getDatanodeId(), i); + } + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test(timeout = 120000) + public void testRecoverOneParityBlock() throws Exception { + int fileLen = 10 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverOneParityBlock", fileLen, 0, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneParityBlock1() throws Exception { + int fileLen = cellSize + cellSize/10; + assertFileBlocksRecovery("/testRecoverOneParityBlock1", fileLen, 0, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneParityBlock2() throws Exception { + int fileLen = 1; + assertFileBlocksRecovery("/testRecoverOneParityBlock2", fileLen, 0, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneParityBlock3() throws Exception { + int fileLen = 3 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverOneParityBlock3", fileLen, 0, 1); + } + + @Test(timeout = 120000) + public void testRecoverThreeParityBlocks() throws Exception { + int fileLen = 10 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverThreeParityBlocks", fileLen, 0, 3); + } + + @Test(timeout = 120000) + public void testRecoverThreeDataBlocks() throws Exception { + int fileLen = 10 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverThreeDataBlocks", fileLen, 1, 3); + } + + @Test(timeout = 120000) + public void testRecoverThreeDataBlocks1() throws Exception { + int fileLen = 3 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverThreeDataBlocks1", fileLen, 1, 3); + } + + @Test(timeout = 120000) + public void testRecoverOneDataBlock() throws Exception { + int fileLen = 10 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverOneDataBlock", fileLen, 1, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneDataBlock1() throws Exception { + int fileLen = cellSize + cellSize/10; + assertFileBlocksRecovery("/testRecoverOneDataBlock1", fileLen, 1, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneDataBlock2() throws Exception { + int fileLen = 1; + assertFileBlocksRecovery("/testRecoverOneDataBlock2", fileLen, 1, 1); + } + + @Test(timeout = 120000) + public void testRecoverAnyBlocks() throws Exception { + int fileLen = 3 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverAnyBlocks", fileLen, 2, 2); + } + + @Test(timeout = 120000) + public void testRecoverAnyBlocks1() throws Exception { + int fileLen = 10 * blockSize + blockSize/10; + assertFileBlocksRecovery("/testRecoverAnyBlocks1", fileLen, 2, 3); + } + + /** + * Test the file blocks recovery. + * 1. Check the replica is recovered in the target datanode, + * and verify the block replica length, generationStamp and content. + * 2. Read the file and verify content. + */ + private void assertFileBlocksRecovery(String fileName, int fileLen, + int recovery, int toRecoverBlockNum) throws Exception { + if (recovery != 0 && recovery != 1 && recovery != 2) { + Assert.fail("Invalid recovery: 0 is to recovery parity blocks," + + "1 is to recovery data blocks, 2 is any."); + } + if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) { + Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum); + } + + Path file = new Path(fileName); + + testCreateStripedFile(file, fileLen); + + LocatedBlocks locatedBlocks = getLocatedBlocks(file); + assertEquals(locatedBlocks.getFileLength(), fileLen); + + LocatedStripedBlock lastBlock = + (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + + DatanodeInfo[] storageInfos = lastBlock.getLocations(); + int[] indices = lastBlock.getBlockIndices(); + + BitSet bitset = new BitSet(dnNum); + for (DatanodeInfo storageInfo : storageInfos) { + bitset.set(dnMap.get(storageInfo)); + } + + int[] toDead = new int[toRecoverBlockNum]; + int n = 0; + for (int i = 0; i < indices.length; i++) { + if (n < toRecoverBlockNum) { + if (recovery == 0) { + if (indices[i] >= dataBlkNum) { + toDead[n++] = i; + } + } else if (recovery == 1) { + if (indices[i] < dataBlkNum) { + toDead[n++] = i; + } + } else { + toDead[n++] = i; + } + } else { + break; + } + } + + DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum]; + int[] deadDnIndices = new int[toRecoverBlockNum]; + ExtendedBlock[] blocks = new ExtendedBlock[toRecoverBlockNum]; + File[] replicas = new File[toRecoverBlockNum]; + File[] metadatas = new File[toRecoverBlockNum]; + byte[][] replicaContents = new byte[toRecoverBlockNum][]; + for (int i = 0; i < toRecoverBlockNum; i++) { + dataDNs[i] = storageInfos[toDead[i]]; + deadDnIndices[i] = dnMap.get(dataDNs[i]); + + // Check the block replica file on deadDn before it dead. + blocks[i] = StripedBlockUtil.constructInternalBlock( + lastBlock.getBlock(), cellSize, dataBlkNum, indices[toDead[i]]); + replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]); + metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]); + // the block replica on the datanode should be the same as expected + assertEquals(replicas[i].length(), + StripedBlockUtil.getInternalBlockLength( + lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[toDead[i]])); + assertTrue(metadatas[i].getName(). + endsWith(blocks[i].getGenerationStamp() + ".meta")); + replicaContents[i] = readReplica(replicas[i]); + } + + int cellsNum = (fileLen - 1) / cellSize + 1; + int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum; + + try { + DatanodeID[] dnIDs = new DatanodeID[toRecoverBlockNum]; + for (int i = 0; i < toRecoverBlockNum; i++) { + /* + * Kill the datanode which contains one replica + * We need to make sure it dead in namenode: clear its update time and + * trigger NN to check heartbeat. + */ + DataNode dn = cluster.getDataNodes().get(deadDnIndices[i]); + dn.shutdown(); + dnIDs[i] = dn.getDatanodeId(); + } + setDataNodesDead(dnIDs); + + // Check the locatedBlocks of the file again + locatedBlocks = getLocatedBlocks(file); + lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + storageInfos = lastBlock.getLocations(); + assertEquals(storageInfos.length, groupSize - toRecoverBlockNum); + + int[] targetDNs = new int[dnNum - groupSize]; + n = 0; + for (int i = 0; i < dnNum; i++) { + if (!bitset.get(i)) { // not contain replica of the block. + targetDNs[n++] = i; + } + } + + waitForRecoveryFinished(file, groupSize); + + targetDNs = sortTargetsByReplicas(blocks, targetDNs); + + // Check the replica on the new target node. + for (int i = 0; i < toRecoverBlockNum; i++) { + File replicaAfterRecovery = cluster.getBlockFile(targetDNs[i], blocks[i]); + File metadataAfterRecovery = + cluster.getBlockMetadataFile(targetDNs[i], blocks[i]); + assertEquals(replicaAfterRecovery.length(), replicas[i].length()); + assertTrue(metadataAfterRecovery.getName(). + endsWith(blocks[i].getGenerationStamp() + ".meta")); + byte[] replicaContentAfterRecovery = readReplica(replicaAfterRecovery); + + Assert.assertArrayEquals(replicaContents[i], replicaContentAfterRecovery); + } + } finally { + for (int i = 0; i < toRecoverBlockNum; i++) { + restartDataNode(toDead[i]); + } + cluster.waitActive(); + } + fs.delete(file, true); + } + + private void setDataNodesDead(DatanodeID[] dnIDs) throws IOException { + for (DatanodeID dn : dnIDs) { + DatanodeDescriptor dnd = + NameNodeAdapter.getDatanode(cluster.getNamesystem(), dn); + DFSTestUtil.setDatanodeDead(dnd); + } + + BlockManagerTestUtil.checkHeartbeat(cluster.getNamesystem().getBlockManager()); + } + + private void restartDataNode(int dn) { + try { + cluster.restartDataNode(dn, true, true); + } catch (IOException e) { + } + } + + private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) { + int[] result = new int[blocks.length]; + for (int i = 0; i < blocks.length; i++) { + result[i] = -1; + for (int j = 0; j < targetDNs.length; j++) { + if (targetDNs[j] != -1) { + File replica = cluster.getBlockFile(targetDNs[j], blocks[i]); + if (replica != null) { + result[i] = targetDNs[j]; + targetDNs[j] = -1; + break; + } + } + } + if (result[i] == -1) { + Assert.fail("Failed to recover striped block: " + blocks[i].getBlockId()); + } + } + return result; + } + + private byte[] readReplica(File replica) throws IOException { + int length = (int)replica.length(); + ByteArrayOutputStream content = new ByteArrayOutputStream(length); + FileInputStream in = new FileInputStream(replica); + try { + byte[] buffer = new byte[1024]; + int total = 0; + while (total < length) { + int n = in.read(buffer); + if (n <= 0) { + break; + } + content.write(buffer, 0, n); + total += n; + } + if (total < length) { + Assert.fail("Failed to read all content of replica"); + } + return content.toByteArray(); + } finally { + in.close(); + } + } + + private LocatedBlocks waitForRecoveryFinished(Path file, int groupSize) + throws Exception { + final int ATTEMPTS = 60; + for (int i = 0; i < ATTEMPTS; i++) { + LocatedBlocks locatedBlocks = getLocatedBlocks(file); + LocatedStripedBlock lastBlock = + (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + DatanodeInfo[] storageInfos = lastBlock.getLocations(); + if (storageInfos.length >= groupSize) { + return locatedBlocks; + } + Thread.sleep(1000); + } + throw new IOException ("Time out waiting for EC block recovery."); + } + + private LocatedBlocks getLocatedBlocks(Path file) throws IOException { + return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE); + } + + private void testCreateStripedFile(Path file, int dataLen) + throws IOException { + final byte[] data = new byte[dataLen]; + ThreadLocalRandom.current().nextBytes(data); + writeContents(file, data); + } + + void writeContents(Path file, byte[] contents) + throws IOException { + FSDataOutputStream out = fs.create(file); + try { + out.write(contents, 0, contents.length); + } finally { + out.close(); + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java index 6cea7e8..6b4e46a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeMode.java @@ -552,7 +552,7 @@ public class TestSafeMode { if(cluster!= null) cluster.shutdown(); } } - + void checkGetBlockLocationsWorks(FileSystem fs, Path fileName) throws IOException { FileStatus stat = fs.getFileStatus(fileName); try { @@ -560,7 +560,7 @@ public class TestSafeMode { } catch (SafeModeException e) { assertTrue("Should have not got safemode exception", false); } catch (RemoteException re) { - assertTrue("Should have not got safemode exception", false); + assertTrue("Should have not got remote exception", false); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeModeWithStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeModeWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeModeWithStripedFile.java new file mode 100644 index 0000000..6f0bc71 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSafeModeWithStripedFile.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestSafeModeWithStripedFile { + + static final short DATA_BLK_NUM = HdfsConstants.NUM_DATA_BLOCKS; + static final short PARITY_BLK_NUM = HdfsConstants.NUM_PARITY_BLOCKS; + static final int numDNs = DATA_BLK_NUM + PARITY_BLK_NUM; + static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + static final int blockSize = cellSize * 2; + + static MiniDFSCluster cluster; + static Configuration conf; + + @Before + public void setup() throws IOException { + conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient().createErasureCodingZone("/", + null, cellSize); + cluster.waitActive(); + + } + + @After + public void tearDown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testStripedFile0() throws IOException { + doTest(cellSize, 1); + } + + @Test + public void testStripedFile1() throws IOException { + doTest(cellSize * 5, 5); + } + + /** + * This util writes a small block group whose size is given by caller. + * Then write another 2 full stripe blocks. + * Then shutdown all DNs and start again one by one. and verify the safemode + * status accordingly. + * + * @param smallSize file size of the small block group + * @param minStorages minimum replicas needed by the block so it can be safe + */ + private void doTest(int smallSize, int minStorages) throws IOException { + FileSystem fs = cluster.getFileSystem(); + // add 1 block + byte[] data = StripedFileTestUtil.generateBytes(smallSize); + Path smallFilePath = new Path("/testStripedFile_" + smallSize); + DFSTestUtil.writeFile(fs, smallFilePath, data); + + // If we only have 1 block, NN won't enter safemode in the first place + // because the threshold is 0 blocks. + // So we need to add another 2 blocks. + int bigSize = blockSize * DATA_BLK_NUM * 2; + Path bigFilePath = new Path("/testStripedFile_" + bigSize); + data = StripedFileTestUtil.generateBytes(bigSize); + DFSTestUtil.writeFile(fs, bigFilePath, data); + // now we have 3 blocks. NN needs 2 blocks to reach the threshold 0.9 of + // total blocks 3. + + // stopping all DNs + List<MiniDFSCluster.DataNodeProperties> dnprops = Lists.newArrayList(); + LocatedBlocks lbs = cluster.getNameNodeRpc() + .getBlockLocations(smallFilePath.toString(), 0, smallSize); + DatanodeInfo[] locations = lbs.get(0).getLocations(); + for (DatanodeInfo loc : locations) { + // keep the DNs that have smallFile in the head of dnprops + dnprops.add(cluster.stopDataNode(loc.getName())); + } + for (int i = 0; i < numDNs - locations.length; i++) { + dnprops.add(cluster.stopDataNode(0)); + } + + cluster.restartNameNode(0); + NameNode nn = cluster.getNameNode(); + assertTrue(cluster.getNameNode().isInSafeMode()); + assertEquals(0, NameNodeAdapter.getSafeModeSafeBlocks(nn)); + + // the block of smallFile doesn't reach minStorages, + // so the safe blocks count doesn't increment. + for (int i = 0; i < minStorages - 1; i++) { + cluster.restartDataNode(dnprops.remove(0)); + cluster.triggerBlockReports(); + assertEquals(0, NameNodeAdapter.getSafeModeSafeBlocks(nn)); + } + + // the block of smallFile reaches minStorages, + // so the safe blocks count increment. + cluster.restartDataNode(dnprops.remove(0)); + cluster.triggerBlockReports(); + assertEquals(1, NameNodeAdapter.getSafeModeSafeBlocks(nn)); + + // the 2 blocks of bigFile need DATA_BLK_NUM storages to be safe + for (int i = minStorages; i < DATA_BLK_NUM - 1; i++) { + cluster.restartDataNode(dnprops.remove(0)); + cluster.triggerBlockReports(); + assertTrue(nn.isInSafeMode()); + } + + cluster.restartDataNode(dnprops.remove(0)); + cluster.triggerBlockReports(); + assertFalse(nn.isInSafeMode()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java new file mode 100644 index 0000000..089a134 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.web.WebHdfsConstants; +import org.apache.hadoop.hdfs.web.WebHdfsTestUtil; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.stripesPerBlock; + +public class TestWriteReadStripedFile { + public static final Log LOG = LogFactory.getLog(TestWriteReadStripedFile.class); + private static MiniDFSCluster cluster; + private static FileSystem fs; + private static Configuration conf = new HdfsConfiguration(); + + static { + ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)) + .getLogger().setLevel(Level.ALL); + } + + @Before + public void setup() throws IOException { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient().createErasureCodingZone("/", + null, cellSize); + fs = cluster.getFileSystem(); + } + + @After + public void tearDown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testFileEmpty() throws IOException { + testOneFileUsingDFSStripedInputStream("/EmptyFile", 0); + testOneFileUsingDFSStripedInputStream("/EmptyFile2", 0, true); + } + + @Test + public void testFileSmallerThanOneCell1() throws IOException { + testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", 1); + testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell2", 1, true); + } + + @Test + public void testFileSmallerThanOneCell2() throws IOException { + testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell", cellSize - 1); + testOneFileUsingDFSStripedInputStream("/SmallerThanOneCell2", cellSize - 1, + true); + } + + @Test + public void testFileEqualsWithOneCell() throws IOException { + testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell", cellSize); + testOneFileUsingDFSStripedInputStream("/EqualsWithOneCell2", cellSize, true); + } + + @Test + public void testFileSmallerThanOneStripe1() throws IOException { + testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", + cellSize * dataBlocks - 1); + testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe2", + cellSize * dataBlocks - 1, true); + } + + @Test + public void testFileSmallerThanOneStripe2() throws IOException { + testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe", + cellSize + 123); + testOneFileUsingDFSStripedInputStream("/SmallerThanOneStripe2", + cellSize + 123, true); + } + + @Test + public void testFileEqualsWithOneStripe() throws IOException { + testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe", + cellSize * dataBlocks); + testOneFileUsingDFSStripedInputStream("/EqualsWithOneStripe2", + cellSize * dataBlocks, true); + } + + @Test + public void testFileMoreThanOneStripe1() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe1", + cellSize * dataBlocks + 123); + testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe12", + cellSize * dataBlocks + 123, true); + } + + @Test + public void testFileMoreThanOneStripe2() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe2", + cellSize * dataBlocks + cellSize * dataBlocks + 123); + testOneFileUsingDFSStripedInputStream("/MoreThanOneStripe22", + cellSize * dataBlocks + cellSize * dataBlocks + 123, true); + } + + @Test + public void testLessThanFullBlockGroup() throws IOException { + testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup", + cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize); + testOneFileUsingDFSStripedInputStream("/LessThanFullBlockGroup2", + cellSize * dataBlocks * (stripesPerBlock - 1) + cellSize, true); + } + + @Test + public void testFileFullBlockGroup() throws IOException { + testOneFileUsingDFSStripedInputStream("/FullBlockGroup", + blockSize * dataBlocks); + testOneFileUsingDFSStripedInputStream("/FullBlockGroup2", + blockSize * dataBlocks, true); + } + + @Test + public void testFileMoreThanABlockGroup1() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup1", + blockSize * dataBlocks + 123); + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup12", + blockSize * dataBlocks + 123, true); + } + + @Test + public void testFileMoreThanABlockGroup2() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup2", + blockSize * dataBlocks + cellSize + 123); + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup22", + blockSize * dataBlocks + cellSize + 123, true); + } + + + @Test + public void testFileMoreThanABlockGroup3() throws IOException { + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup3", + blockSize * dataBlocks * 3 + cellSize * dataBlocks + + cellSize + 123); + testOneFileUsingDFSStripedInputStream("/MoreThanABlockGroup32", + blockSize * dataBlocks * 3 + cellSize * dataBlocks + + cellSize + 123, true); + } + + private void testOneFileUsingDFSStripedInputStream(String src, int fileLength) + throws IOException { + testOneFileUsingDFSStripedInputStream(src, fileLength, false); + } + + private void testOneFileUsingDFSStripedInputStream(String src, int fileLength, + boolean withDataNodeFailure) throws IOException { + final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); + Path srcPath = new Path(src); + DFSTestUtil.writeFile(fs, srcPath, new String(expected)); + + StripedFileTestUtil.verifyLength(fs, srcPath, fileLength); + + if (withDataNodeFailure) { + int dnIndex = 1; // TODO: StripedFileTestUtil.random.nextInt(dataBlocks); + LOG.info("stop DataNode " + dnIndex); + stopDataNode(srcPath, dnIndex); + } + + byte[] smallBuf = new byte[1024]; + byte[] largeBuf = new byte[fileLength + 100]; + StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf); + + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, + largeBuf); + StripedFileTestUtil.verifySeek(fs, srcPath, fileLength); + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, + ByteBuffer.allocate(fileLength + 100)); + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, + smallBuf); + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, + ByteBuffer.allocate(1024)); + } + + private void stopDataNode(Path path, int failedDNIdx) + throws IOException { + BlockLocation[] locs = fs.getFileBlockLocations(path, 0, cellSize); + if (locs != null && locs.length > 0) { + String name = (locs[0].getNames())[failedDNIdx]; + for (DataNode dn : cluster.getDataNodes()) { + int port = dn.getXferPort(); + if (name.contains(Integer.toString(port))) { + dn.shutdown(); + break; + } + } + } + } + + @Test + public void testWriteReadUsingWebHdfs() throws Exception { + int fileLength = blockSize * dataBlocks + cellSize + 123; + + final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); + FileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, + WebHdfsConstants.WEBHDFS_SCHEME); + Path srcPath = new Path("/testWriteReadUsingWebHdfs"); + DFSTestUtil.writeFile(fs, srcPath, new String(expected)); + + StripedFileTestUtil.verifyLength(fs, srcPath, fileLength); + + byte[] smallBuf = new byte[1024]; + byte[] largeBuf = new byte[fileLength + 100]; + // TODO: HDFS-8797 + //StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf); + + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, largeBuf); + StripedFileTestUtil.verifySeek(fs, srcPath, fileLength); + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, smallBuf); + // webhdfs doesn't support bytebuffer read + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java new file mode 100644 index 0000000..3679c5f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteStripedFileWithFailure.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks; + +public class TestWriteStripedFileWithFailure { + public static final Log LOG = LogFactory + .getLog(TestWriteStripedFileWithFailure.class); + private static MiniDFSCluster cluster; + private static FileSystem fs; + private static Configuration conf = new HdfsConfiguration(); + private final int smallFileLength = blockSize * dataBlocks - 123; + private final int largeFileLength = blockSize * dataBlocks + 123; + private final int[] fileLengths = {smallFileLength, largeFileLength}; + + public void setup() throws IOException { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient().createErasureCodingZone("/", + null, cellSize); + fs = cluster.getFileSystem(); + } + + public void tearDown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + // Test writing file with some Datanodes failure + @Test(timeout = 300000) + public void testWriteStripedFileWithDNFailure() throws IOException { + for (int fileLength : fileLengths) { + for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) { + for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) { + try { + // setup a new cluster with no dead datanode + setup(); + writeFileWithDNFailure(fileLength, dataDelNum, parityDelNum); + } catch (IOException ioe) { + String fileType = fileLength < (blockSize * dataBlocks) ? + "smallFile" : "largeFile"; + LOG.error("Failed to write file with DN failure:" + + " fileType = "+ fileType + + ", dataDelNum = " + dataDelNum + + ", parityDelNum = " + parityDelNum); + throw ioe; + } finally { + // tear down the cluster + tearDown(); + } + } + } + } + } + + /** + * Test writing a file with shutting down some DNs(data DNs or parity DNs or both). + * @param fileLength file length + * @param dataDNFailureNum the shutdown number of data DNs + * @param parityDNFailureNum the shutdown number of parity DNs + * @throws IOException + */ + private void writeFileWithDNFailure(int fileLength, + int dataDNFailureNum, int parityDNFailureNum) throws IOException { + String fileType = fileLength < (blockSize * dataBlocks) ? + "smallFile" : "largeFile"; + String src = "/dnFailure_" + dataDNFailureNum + "_" + parityDNFailureNum + + "_" + fileType; + LOG.info("writeFileWithDNFailure: file = " + src + + ", fileType = " + fileType + + ", dataDNFailureNum = " + dataDNFailureNum + + ", parityDNFailureNum = " + parityDNFailureNum); + + Path srcPath = new Path(src); + final AtomicInteger pos = new AtomicInteger(); + final FSDataOutputStream out = fs.create(srcPath); + final DFSStripedOutputStream stripedOut + = (DFSStripedOutputStream)out.getWrappedStream(); + + int[] dataDNFailureIndices = StripedFileTestUtil.randomArray(0, dataBlocks, + dataDNFailureNum); + Assert.assertNotNull(dataDNFailureIndices); + int[] parityDNFailureIndices = StripedFileTestUtil.randomArray(dataBlocks, + dataBlocks + parityBlocks, parityDNFailureNum); + Assert.assertNotNull(parityDNFailureIndices); + + int[] failedDataNodes = new int[dataDNFailureNum + parityDNFailureNum]; + System.arraycopy(dataDNFailureIndices, 0, failedDataNodes, + 0, dataDNFailureIndices.length); + System.arraycopy(parityDNFailureIndices, 0, failedDataNodes, + dataDNFailureIndices.length, parityDNFailureIndices.length); + + final int killPos = fileLength/2; + for (; pos.get() < fileLength; ) { + final int i = pos.getAndIncrement(); + if (i == killPos) { + for(int failedDn : failedDataNodes) { + StripedFileTestUtil.killDatanode(cluster, stripedOut, failedDn, pos); + } + } + write(out, i); + } + out.close(); + + // make sure the expected number of Datanode have been killed + int dnFailureNum = dataDNFailureNum + parityDNFailureNum; + Assert.assertEquals(cluster.getDataNodes().size(), numDNs - dnFailureNum); + + byte[] smallBuf = new byte[1024]; + byte[] largeBuf = new byte[fileLength + 100]; + final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); + StripedFileTestUtil.verifyLength(fs, srcPath, fileLength); + StripedFileTestUtil.verifySeek(fs, srcPath, fileLength); + StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected, + smallBuf); + StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf); + + // delete the file + fs.delete(srcPath, true); + } + + void write(FSDataOutputStream out, int i) throws IOException { + try { + out.write(StripedFileTestUtil.getByte(i)); + } catch (IOException e) { + throw new IOException("Failed at i=" + i, e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestLayoutVersion.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestLayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestLayoutVersion.java index 9f8aef5..e944b81 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestLayoutVersion.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestLayoutVersion.java @@ -126,7 +126,8 @@ public class TestLayoutVersion { EnumSet<NameNodeLayoutVersion.Feature> compatibleFeatures = EnumSet.of( NameNodeLayoutVersion.Feature.TRUNCATE, NameNodeLayoutVersion.Feature.APPEND_NEW_BLOCK, - NameNodeLayoutVersion.Feature.QUOTA_BY_STORAGE_TYPE); + NameNodeLayoutVersion.Feature.QUOTA_BY_STORAGE_TYPE, + NameNodeLayoutVersion.Feature.ERASURE_CODING); for (LayoutFeature f : compatibleFeatures) { assertEquals(String.format("Expected minimum compatible layout version " + "%d for feature %s.", baseLV, f), baseLV, http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index c7233bd..3675e63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import org.apache.hadoop.fs.permission.AclEntry; @@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; @@ -63,15 +65,21 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; import org.apache.hadoop.hdfs.security.token.block.BlockKey; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -80,6 +88,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -181,40 +190,58 @@ public class TestPBHelper { assertEquals(b, b2); } - private static BlockWithLocations getBlockWithLocations(int bid) { + private static BlockWithLocations getBlockWithLocations( + int bid, boolean isStriped) { final String[] datanodeUuids = {"dn1", "dn2", "dn3"}; final String[] storageIDs = {"s1", "s2", "s3"}; final StorageType[] storageTypes = { StorageType.DISK, StorageType.DISK, StorageType.DISK}; - return new BlockWithLocations(new Block(bid, 0, 1), + final byte[] indices = {0, 1, 2}; + final short dataBlkNum = 6; + BlockWithLocations blkLocs = new BlockWithLocations(new Block(bid, 0, 1), datanodeUuids, storageIDs, storageTypes); + if (isStriped) { + blkLocs = new StripedBlockWithLocations(blkLocs, indices, dataBlkNum); + } + return blkLocs; } private void compare(BlockWithLocations locs1, BlockWithLocations locs2) { assertEquals(locs1.getBlock(), locs2.getBlock()); assertTrue(Arrays.equals(locs1.getStorageIDs(), locs2.getStorageIDs())); + if (locs1 instanceof StripedBlockWithLocations) { + assertTrue(Arrays.equals(((StripedBlockWithLocations) locs1).getIndices(), + ((StripedBlockWithLocations) locs2).getIndices())); + } } @Test public void testConvertBlockWithLocations() { - BlockWithLocations locs = getBlockWithLocations(1); - BlockWithLocationsProto locsProto = PBHelper.convert(locs); - BlockWithLocations locs2 = PBHelper.convert(locsProto); - compare(locs, locs2); + boolean[] testSuite = new boolean[]{false, true}; + for (int i = 0; i < testSuite.length; i++) { + BlockWithLocations locs = getBlockWithLocations(1, testSuite[i]); + BlockWithLocationsProto locsProto = PBHelper.convert(locs); + BlockWithLocations locs2 = PBHelper.convert(locsProto); + compare(locs, locs2); + } } @Test public void testConvertBlocksWithLocations() { - BlockWithLocations[] list = new BlockWithLocations[] { - getBlockWithLocations(1), getBlockWithLocations(2) }; - BlocksWithLocations locs = new BlocksWithLocations(list); - BlocksWithLocationsProto locsProto = PBHelper.convert(locs); - BlocksWithLocations locs2 = PBHelper.convert(locsProto); - BlockWithLocations[] blocks = locs.getBlocks(); - BlockWithLocations[] blocks2 = locs2.getBlocks(); - assertEquals(blocks.length, blocks2.length); - for (int i = 0; i < blocks.length; i++) { - compare(blocks[i], blocks2[i]); + boolean[] testSuite = new boolean[]{false, true}; + for (int i = 0; i < testSuite.length; i++) { + BlockWithLocations[] list = new BlockWithLocations[]{ + getBlockWithLocations(1, testSuite[i]), + getBlockWithLocations(2, testSuite[i])}; + BlocksWithLocations locs = new BlocksWithLocations(list); + BlocksWithLocationsProto locsProto = PBHelper.convert(locs); + BlocksWithLocations locs2 = PBHelper.convert(locsProto); + BlockWithLocations[] blocks = locs.getBlocks(); + BlockWithLocations[] blocks2 = locs2.getBlocks(); + assertEquals(blocks.length, blocks2.length); + for (int j = 0; j < blocks.length; j++) { + compare(blocks[j], blocks2[j]); + } } } @@ -489,16 +516,16 @@ public class TestPBHelper { @Test public void testConvertLocatedBlock() { LocatedBlock lb = createLocatedBlock(); - LocatedBlockProto lbProto = PBHelper.convert(lb); - LocatedBlock lb2 = PBHelper.convert(lbProto); + LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb); + LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto); compare(lb,lb2); } @Test public void testConvertLocatedBlockNoStorageMedia() { LocatedBlock lb = createLocatedBlockNoStorageMedia(); - LocatedBlockProto lbProto = PBHelper.convert(lb); - LocatedBlock lb2 = PBHelper.convert(lbProto); + LocatedBlockProto lbProto = PBHelper.convertLocatedBlock(lb); + LocatedBlock lb2 = PBHelper.convertLocatedBlockProto(lbProto); compare(lb,lb2); } @@ -508,8 +535,8 @@ public class TestPBHelper { for (int i=0;i<3;i++) { lbl.add(createLocatedBlock()); } - List<LocatedBlockProto> lbpl = PBHelper.convertLocatedBlock2(lbl); - List<LocatedBlock> lbl2 = PBHelper.convertLocatedBlock(lbpl); + List<LocatedBlockProto> lbpl = PBHelper.convertLocatedBlocks2(lbl); + List<LocatedBlock> lbl2 = PBHelper.convertLocatedBlocks(lbpl); assertEquals(lbl.size(), lbl2.size()); for (int i=0;i<lbl.size();i++) { compare(lbl.get(i), lbl2.get(2)); @@ -522,8 +549,8 @@ public class TestPBHelper { for (int i=0;i<3;i++) { lbl[i] = createLocatedBlock(); } - LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlock(lbl); - LocatedBlock [] lbl2 = PBHelper.convertLocatedBlock(lbpl); + LocatedBlockProto [] lbpl = PBHelper.convertLocatedBlocks(lbl); + LocatedBlock [] lbl2 = PBHelper.convertLocatedBlocks(lbpl); assertEquals(lbl.length, lbl2.length); for (int i=0;i<lbl.length;i++) { compare(lbl[i], lbl2[i]); @@ -639,4 +666,99 @@ public class TestPBHelper { .build(); Assert.assertEquals(s, PBHelper.convert(PBHelper.convert(s))); } + + @Test + public void testBlockECRecoveryCommand() { + DatanodeInfo[] dnInfos0 = new DatanodeInfo[] { + DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() }; + DatanodeStorageInfo targetDnInfos_0 = BlockManagerTestUtil + .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), + new DatanodeStorage("s00")); + DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil + .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), + new DatanodeStorage("s01")); + DatanodeStorageInfo[] targetDnInfos0 = new DatanodeStorageInfo[] { + targetDnInfos_0, targetDnInfos_1 }; + short[] liveBlkIndices0 = new short[2]; + BlockECRecoveryInfo blkECRecoveryInfo0 = new BlockECRecoveryInfo( + new ExtendedBlock("bp1", 1234), dnInfos0, targetDnInfos0, + liveBlkIndices0, ErasureCodingSchemaManager.getSystemDefaultSchema(), + 64 * 1024); + DatanodeInfo[] dnInfos1 = new DatanodeInfo[] { + DFSTestUtil.getLocalDatanodeInfo(), DFSTestUtil.getLocalDatanodeInfo() }; + DatanodeStorageInfo targetDnInfos_2 = BlockManagerTestUtil + .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), + new DatanodeStorage("s02")); + DatanodeStorageInfo targetDnInfos_3 = BlockManagerTestUtil + .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), + new DatanodeStorage("s03")); + DatanodeStorageInfo[] targetDnInfos1 = new DatanodeStorageInfo[] { + targetDnInfos_2, targetDnInfos_3 }; + short[] liveBlkIndices1 = new short[2]; + BlockECRecoveryInfo blkECRecoveryInfo1 = new BlockECRecoveryInfo( + new ExtendedBlock("bp2", 3256), dnInfos1, targetDnInfos1, + liveBlkIndices1, ErasureCodingSchemaManager.getSystemDefaultSchema(), + 64 * 1024); + List<BlockECRecoveryInfo> blkRecoveryInfosList = new ArrayList<BlockECRecoveryInfo>(); + blkRecoveryInfosList.add(blkECRecoveryInfo0); + blkRecoveryInfosList.add(blkECRecoveryInfo1); + BlockECRecoveryCommand blkECRecoveryCmd = new BlockECRecoveryCommand( + DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, blkRecoveryInfosList); + BlockECRecoveryCommandProto blkECRecoveryCmdProto = PBHelper + .convert(blkECRecoveryCmd); + blkECRecoveryCmd = PBHelper.convert(blkECRecoveryCmdProto); + Iterator<BlockECRecoveryInfo> iterator = blkECRecoveryCmd.getECTasks() + .iterator(); + assertBlockECRecoveryInfoEquals(blkECRecoveryInfo0, iterator.next()); + assertBlockECRecoveryInfoEquals(blkECRecoveryInfo1, iterator.next()); + } + + private void assertBlockECRecoveryInfoEquals( + BlockECRecoveryInfo blkECRecoveryInfo1, + BlockECRecoveryInfo blkECRecoveryInfo2) { + assertEquals(blkECRecoveryInfo1.getExtendedBlock(), + blkECRecoveryInfo2.getExtendedBlock()); + + DatanodeInfo[] sourceDnInfos1 = blkECRecoveryInfo1.getSourceDnInfos(); + DatanodeInfo[] sourceDnInfos2 = blkECRecoveryInfo2.getSourceDnInfos(); + assertDnInfosEqual(sourceDnInfos1, sourceDnInfos2); + + DatanodeInfo[] targetDnInfos1 = blkECRecoveryInfo1.getTargetDnInfos(); + DatanodeInfo[] targetDnInfos2 = blkECRecoveryInfo2.getTargetDnInfos(); + assertDnInfosEqual(targetDnInfos1, targetDnInfos2); + + String[] targetStorageIDs1 = blkECRecoveryInfo1.getTargetStorageIDs(); + String[] targetStorageIDs2 = blkECRecoveryInfo2.getTargetStorageIDs(); + assertEquals(targetStorageIDs1.length, targetStorageIDs2.length); + for (int i = 0; i < targetStorageIDs1.length; i++) { + assertEquals(targetStorageIDs1[i], targetStorageIDs2[i]); + } + + short[] liveBlockIndices1 = blkECRecoveryInfo1.getLiveBlockIndices(); + short[] liveBlockIndices2 = blkECRecoveryInfo2.getLiveBlockIndices(); + for (int i = 0; i < liveBlockIndices1.length; i++) { + assertEquals(liveBlockIndices1[i], liveBlockIndices2[i]); + } + + ECSchema ecSchema1 = blkECRecoveryInfo1.getECSchema(); + ECSchema ecSchema2 = blkECRecoveryInfo2.getECSchema(); + // Compare ECSchemas same as default ECSchema as we used system default + // ECSchema used in this test + compareECSchemas(ErasureCodingSchemaManager.getSystemDefaultSchema(), ecSchema1); + compareECSchemas(ErasureCodingSchemaManager.getSystemDefaultSchema(), ecSchema2); + } + + private void compareECSchemas(ECSchema ecSchema1, ECSchema ecSchema2) { + assertEquals(ecSchema1.getSchemaName(), ecSchema2.getSchemaName()); + assertEquals(ecSchema1.getNumDataUnits(), ecSchema2.getNumDataUnits()); + assertEquals(ecSchema1.getNumParityUnits(), ecSchema2.getNumParityUnits()); + } + + private void assertDnInfosEqual(DatanodeInfo[] dnInfos1, + DatanodeInfo[] dnInfos2) { + assertEquals(dnInfos1.length, dnInfos2.length); + for (int i = 0; i < dnInfos1.length; i++) { + compare(dnInfos1[i], dnInfos2[i]); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 194aa0f..3c149d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; @@ -80,6 +81,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -144,6 +146,22 @@ public class TestBalancer { LazyPersistTestCase.initCacheManipulator(); } + int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + int groupSize = dataBlocks + parityBlocks; + private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final static int stripesPerBlock = 4; + static int DEFAULT_STRIPE_BLOCK_SIZE = cellSize * stripesPerBlock; + + static void initConfWithStripe(Configuration conf) { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_STRIPE_BLOCK_SIZE); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + SimulatedFSDataset.setFactory(conf); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L); + conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L); + } + /* create a file with a length of <code>fileLen</code> */ static void createFile(MiniDFSCluster cluster, Path filePath, long fileLen, short replicationFactor, int nnIndex) @@ -1529,6 +1547,75 @@ public class TestBalancer { } } + public void integrationTestWithStripedFile(Configuration conf) throws Exception { + initConfWithStripe(conf); + doTestBalancerWithStripedFile(conf); + } + + @Test(timeout = 100000) + public void testBalancerWithStripedFile() throws Exception { + Configuration conf = new Configuration(); + initConfWithStripe(conf); + doTestBalancerWithStripedFile(conf); + } + + private void doTestBalancerWithStripedFile(Configuration conf) throws Exception { + int numOfDatanodes = dataBlocks + parityBlocks + 2; + int numOfRacks = dataBlocks; + long capacity = 20 * DEFAULT_STRIPE_BLOCK_SIZE; + long[] capacities = new long[numOfDatanodes]; + for (int i = 0; i < capacities.length; i++) { + capacities[i] = capacity; + } + String[] racks = new String[numOfDatanodes]; + for (int i = 0; i < numOfDatanodes; i++) { + racks[i] = "/rack" + (i % numOfRacks); + } + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numOfDatanodes) + .racks(racks) + .simulatedCapacities(capacities) + .build(); + + try { + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), + ClientProtocol.class).getProxy(); + client.createErasureCodingZone("/", null, 0); + + long totalCapacity = sum(capacities); + + // fill up the cluster with 30% data. It'll be 45% full plus parity. + long fileLen = totalCapacity * 3 / 10; + long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks; + FileSystem fs = cluster.getFileSystem(0); + DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong()); + + // verify locations of striped blocks + LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); + + // add one datanode + String newRack = "/rack" + (++numOfRacks); + cluster.startDataNodes(conf, 1, true, null, + new String[]{newRack}, null, new long[]{capacity}); + totalCapacity += capacity; + cluster.triggerHeartbeats(); + + // run balancer and validate results + Balancer.Parameters p = Balancer.Parameters.DEFAULT; + Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); + runBalancer(conf, totalUsedSpace, totalCapacity, p, 0); + + // verify locations of striped blocks + locatedBlocks = client.getBlockLocations(fileName, 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); + + } finally { + cluster.shutdown(); + } + } + /** * @param args */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 148135b..64d80bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -161,7 +161,7 @@ public class BlockManagerTestUtil { */ public static int computeAllPendingWork(BlockManager bm) { int work = computeInvalidationWork(bm); - work += bm.computeReplicationWork(Integer.MAX_VALUE); + work += bm.computeBlockRecoveryWork(Integer.MAX_VALUE); return work; } @@ -306,4 +306,12 @@ public class BlockManagerTestUtil { throws ExecutionException, InterruptedException { dm.getDecomManager().runMonitor(); } + + /** + * add block to the replicateBlocks queue of the Datanode + */ + public static void addBlockToBeReplicated(DatanodeDescriptor node, + Block block, DatanodeStorageInfo[] targets) { + node.addBlockToBeReplicated(block, targets); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java index 5126aa7..bae4f1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java @@ -63,7 +63,7 @@ public class TestBlockInfo { final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1"); - boolean added = blockInfo.addStorage(storage); + boolean added = blockInfo.addStorage(storage, blockInfo); Assert.assertTrue(added); Assert.assertEquals(storage, blockInfo.getStorageInfo(0)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java new file mode 100644 index 0000000..6788770 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingSchemaManager; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Test {@link BlockInfoStriped} + */ +public class TestBlockInfoStriped { + private static final int TOTAL_NUM_BLOCKS = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; + private static final long BASE_ID = -1600; + private static final Block baseBlock = new Block(BASE_ID); + private static final ECSchema testSchema + = ErasureCodingSchemaManager.getSystemDefaultSchema(); + private static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final BlockInfoStriped info = new BlockInfoStriped(baseBlock, + testSchema, cellSize); + + private Block[] createReportedBlocks(int num) { + Block[] blocks = new Block[num]; + for (int i = 0; i < num; i++) { + blocks[i] = new Block(BASE_ID + i); + } + return blocks; + } + + /** + * Test adding storage and reported block + */ + @Test + public void testAddStorage() { + // first add NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS storages, i.e., a complete + // group of blocks/storages + DatanodeStorageInfo[] storageInfos = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS); + Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS); + int i = 0; + for (; i < storageInfos.length; i += 2) { + info.addStorage(storageInfos[i], blocks[i]); + Assert.assertEquals(i/2 + 1, info.numNodes()); + } + i /= 2; + for (int j = 1; j < storageInfos.length; j += 2) { + Assert.assertTrue(info.addStorage(storageInfos[j], blocks[j])); + Assert.assertEquals(i + (j+1)/2, info.numNodes()); + } + + // check + byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices"); + Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity()); + Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length); + i = 0; + for (DatanodeStorageInfo storage : storageInfos) { + int index = info.findStorageInfo(storage); + Assert.assertEquals(i++, index); + Assert.assertEquals(index, indices[index]); + } + + // the same block is reported from the same storage twice + i = 0; + for (DatanodeStorageInfo storage : storageInfos) { + Assert.assertTrue(info.addStorage(storage, blocks[i++])); + } + Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity()); + Assert.assertEquals(TOTAL_NUM_BLOCKS, info.numNodes()); + Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length); + i = 0; + for (DatanodeStorageInfo storage : storageInfos) { + int index = info.findStorageInfo(storage); + Assert.assertEquals(i++, index); + Assert.assertEquals(index, indices[index]); + } + + // the same block is reported from another storage + DatanodeStorageInfo[] storageInfos2 = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS * 2); + // only add the second half of info2 + for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) { + info.addStorage(storageInfos2[i], blocks[i % TOTAL_NUM_BLOCKS]); + Assert.assertEquals(i + 1, info.getCapacity()); + Assert.assertEquals(i + 1, info.numNodes()); + indices = (byte[]) Whitebox.getInternalState(info, "indices"); + Assert.assertEquals(i + 1, indices.length); + } + for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) { + int index = info.findStorageInfo(storageInfos2[i]); + Assert.assertEquals(i++, index); + Assert.assertEquals(index - TOTAL_NUM_BLOCKS, indices[index]); + } + } + + @Test + public void testRemoveStorage() { + // first add TOTAL_NUM_BLOCKS into the BlockInfoStriped + DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS); + Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS); + for (int i = 0; i < storages.length; i++) { + info.addStorage(storages[i], blocks[i]); + } + + // remove two storages + info.removeStorage(storages[0]); + info.removeStorage(storages[2]); + + // check + Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity()); + Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes()); + byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices"); + for (int i = 0; i < storages.length; i++) { + int index = info.findStorageInfo(storages[i]); + if (i != 0 && i != 2) { + Assert.assertEquals(i, index); + Assert.assertEquals(index, indices[index]); + } else { + Assert.assertEquals(-1, index); + Assert.assertEquals(-1, indices[i]); + } + } + + // the same block is reported from another storage + DatanodeStorageInfo[] storages2 = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS * 2); + for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) { + info.addStorage(storages2[i], blocks[i % TOTAL_NUM_BLOCKS]); + } + // now we should have 8 storages + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.numNodes()); + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity()); + indices = (byte[]) Whitebox.getInternalState(info, "indices"); + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length); + int j = TOTAL_NUM_BLOCKS; + for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) { + int index = info.findStorageInfo(storages2[i]); + if (i == TOTAL_NUM_BLOCKS || i == TOTAL_NUM_BLOCKS + 2) { + Assert.assertEquals(i - TOTAL_NUM_BLOCKS, index); + } else { + Assert.assertEquals(j++, index); + } + } + + // remove the storages from storages2 + for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) { + info.removeStorage(storages2[i + TOTAL_NUM_BLOCKS]); + } + // now we should have 3 storages + Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes()); + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity()); + indices = (byte[]) Whitebox.getInternalState(info, "indices"); + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length); + for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) { + if (i == 0 || i == 2) { + int index = info.findStorageInfo(storages2[i + TOTAL_NUM_BLOCKS]); + Assert.assertEquals(-1, index); + } else { + int index = info.findStorageInfo(storages[i]); + Assert.assertEquals(i, index); + } + } + for (int i = TOTAL_NUM_BLOCKS; i < TOTAL_NUM_BLOCKS * 2 - 2; i++) { + Assert.assertEquals(-1, indices[i]); + Assert.assertNull(info.getDatanode(i)); + } + } + + @Test + public void testReplaceBlock() { + DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS); + Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS); + // add block/storage 0, 2, 4 into the BlockInfoStriped + for (int i = 0; i < storages.length; i += 2) { + Assert.assertEquals(AddBlockResult.ADDED, + storages[i].addBlock(info, blocks[i])); + } + + BlockInfoStriped newBlockInfo = new BlockInfoStriped(info); + info.replaceBlock(newBlockInfo); + + // make sure the newBlockInfo is correct + byte[] indices = (byte[]) Whitebox.getInternalState(newBlockInfo, "indices"); + for (int i = 0; i < storages.length; i += 2) { + int index = newBlockInfo.findStorageInfo(storages[i]); + Assert.assertEquals(i, index); + Assert.assertEquals(index, indices[i]); + + // make sure the newBlockInfo is added to the linked list of the storage + Assert.assertSame(newBlockInfo, storages[i].getBlockListHeadForTesting()); + Assert.assertEquals(1, storages[i].numBlocks()); + Assert.assertNull(newBlockInfo.getNext()); + } + } + + @Test + public void testWrite() { + long blkID = 1; + long numBytes = 1; + long generationStamp = 1; + ByteBuffer byteBuffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE * 3); + byteBuffer.putLong(blkID).putLong(numBytes).putLong(generationStamp); + + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(byteStream); + BlockInfoStriped blk = new BlockInfoStriped(new Block(blkID, numBytes, + generationStamp), testSchema, cellSize); + + try { + blk.write(out); + } catch(Exception ex) { + fail("testWrite error:" + ex.getMessage()); + } + assertEquals(byteBuffer.array().length, byteStream.toByteArray().length); + assertArrayEquals(byteBuffer.array(), byteStream.toByteArray()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java index a7ba293..a447aaf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java @@ -23,7 +23,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; -import org.apache.hadoop.util.Time; import org.junit.Test; /** @@ -51,7 +50,7 @@ public class TestBlockInfoUnderConstruction { DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000); DFSTestUtil.resetLastUpdatesWithOffset(dd3, -2 * 1000); blockInfo.initializeBlockRecovery(1); - BlockInfoContiguousUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1); + BlockInfoUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1); assertEquals(blockInfoRecovery[0], blockInfo); // Recovery attempt #2. http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc2564af/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 2d3d90a..66a4681 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -383,7 +383,7 @@ public class TestBlockManager { for (int i = 1; i < pipeline.length; i++) { DatanodeStorageInfo storage = pipeline[i]; bm.addBlock(storage, blockInfo, null); - blockInfo.addStorage(storage); + blockInfo.addStorage(storage, blockInfo); } } @@ -393,7 +393,7 @@ public class TestBlockManager { for (DatanodeDescriptor dn : nodes) { for (DatanodeStorageInfo storage : dn.getStorageInfos()) { - blockInfo.addStorage(storage); + blockInfo.addStorage(storage, blockInfo); } } return blockInfo; @@ -453,8 +453,8 @@ public class TestBlockManager { assertEquals("Block not initially pending replication", 0, bm.pendingReplications.getNumReplicas(block)); assertEquals( - "computeReplicationWork should indicate replication is needed", 1, - bm.computeReplicationWorkForBlocks(list_all)); + "computeBlockRecoveryWork should indicate replication is needed", 1, + bm.computeRecoveryWorkForBlocks(list_all)); assertTrue("replication is pending after work is computed", bm.pendingReplications.getNumReplicas(block) > 0); @@ -508,35 +508,38 @@ public class TestBlockManager { assertNotNull("Chooses source node for a highest-priority replication" + " even if all available source nodes have reached their replication" + " limits below the hard limit.", - bm.chooseSourceDatanode( - aBlock, + bm.chooseSourceDatanodes( + bm.getStoredBlock(aBlock), cntNodes, liveNodes, new NumberReplicas(), - UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)); - - assertNull("Does not choose a source node for a less-than-highest-priority" - + " replication since all available source nodes have reached" - + " their replication limits.", - bm.chooseSourceDatanode( - aBlock, + new ArrayList<Short>(), + UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]); + + assertEquals("Does not choose a source node for a less-than-highest-priority" + + " replication since all available source nodes have reached" + + " their replication limits.", 0, + bm.chooseSourceDatanodes( + bm.getStoredBlock(aBlock), cntNodes, liveNodes, new NumberReplicas(), - UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)); + new ArrayList<Short>(), + UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).length); // Increase the replication count to test replication count > hard limit DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] }; origNodes.get(0).addBlockToBeReplicated(aBlock, targets); - assertNull("Does not choose a source node for a highest-priority" - + " replication when all available nodes exceed the hard limit.", - bm.chooseSourceDatanode( - aBlock, + assertEquals("Does not choose a source node for a highest-priority" + + " replication when all available nodes exceed the hard limit.", 0, + bm.chooseSourceDatanodes( + bm.getStoredBlock(aBlock), cntNodes, liveNodes, new NumberReplicas(), - UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)); + new ArrayList<Short>(), + UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).length); } @Test @@ -557,30 +560,28 @@ public class TestBlockManager { assertNotNull("Chooses decommissioning source node for a normal replication" + " if all available source nodes have reached their replication" + " limits below the hard limit.", - bm.chooseSourceDatanode( - aBlock, + bm.chooseSourceDatanodes( + bm.getStoredBlock(aBlock), cntNodes, liveNodes, - new NumberReplicas(), - UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)); + new NumberReplicas(), new LinkedList<Short>(), + UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)[0]); // Increase the replication count to test replication count > hard limit DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] }; origNodes.get(0).addBlockToBeReplicated(aBlock, targets); - assertNull("Does not choose a source decommissioning node for a normal" - + " replication when all available nodes exceed the hard limit.", - bm.chooseSourceDatanode( - aBlock, + assertEquals("Does not choose a source decommissioning node for a normal" + + " replication when all available nodes exceed the hard limit.", 0, + bm.chooseSourceDatanodes( + bm.getStoredBlock(aBlock), cntNodes, liveNodes, - new NumberReplicas(), - UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)); + new NumberReplicas(), new LinkedList<Short>(), + UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).length); } - - @Test public void testSafeModeIBR() throws Exception { DatanodeDescriptor node = spy(nodes.get(0));
