http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java new file mode 100644 index 0000000..52626e1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java @@ -0,0 +1,433 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSStripedOutputStream; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; +import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.io.IOUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; +import static org.junit.Assert.assertEquals; + +public class TestAddStripedBlocks { + private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS + + HdfsConstants.NUM_PARITY_BLOCKS; + + private MiniDFSCluster cluster; + private DistributedFileSystem dfs; + + @Before + public void setup() throws IOException { + cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()) + .numDataNodes(GROUP_SIZE).build(); + cluster.waitActive(); + dfs = cluster.getFileSystem(); + dfs.getClient().createErasureCodingZone("/", null, 0); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * Make sure the IDs of striped blocks do not conflict + */ + @Test + public void testAllocateBlockId() throws Exception { + Path testPath = new Path("/testfile"); + // create a file while allocates a new block + DFSTestUtil.writeFile(dfs, testPath, "hello, world!"); + LocatedBlocks lb = dfs.getClient().getLocatedBlocks(testPath.toString(), 0); + final long firstId = lb.get(0).getBlock().getBlockId(); + // delete the file + dfs.delete(testPath, true); + + // allocate a new block, and make sure the new block's id does not conflict + // with the previous one + DFSTestUtil.writeFile(dfs, testPath, "hello again"); + lb = dfs.getClient().getLocatedBlocks(testPath.toString(), 0); + final long secondId = lb.get(0).getBlock().getBlockId(); + Assert.assertEquals(firstId + HdfsServerConstants.MAX_BLOCKS_IN_GROUP, secondId); + } + + private static void writeAndFlushStripedOutputStream( + DFSStripedOutputStream out, int chunkSize) throws IOException { + // FSOutputSummer.BUFFER_NUM_CHUNKS == 9 + byte[] toWrite = new byte[chunkSize * 9 + 1]; + out.write(toWrite); + DFSTestUtil.flushInternal(out); + } + + @Test (timeout=60000) + public void testAddStripedBlock() throws Exception { + final Path file = new Path("/file1"); + // create an empty file + FSDataOutputStream out = null; + try { + out = dfs.create(file, (short) 1); + writeAndFlushStripedOutputStream( + (DFSStripedOutputStream) out.getWrappedStream(), + DFS_BYTES_PER_CHECKSUM_DEFAULT); + + FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); + INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); + + BlockInfo[] blocks = fileNode.getBlocks(); + assertEquals(1, blocks.length); + Assert.assertTrue(blocks[0].isStriped()); + + checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), true); + + // restart NameNode to check editlog + cluster.restartNameNode(true); + fsdir = cluster.getNamesystem().getFSDirectory(); + fileNode = fsdir.getINode4Write(file.toString()).asFile(); + blocks = fileNode.getBlocks(); + assertEquals(1, blocks.length); + Assert.assertTrue(blocks[0].isStriped()); + checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), false); + + // save namespace, restart namenode, and check + dfs = cluster.getFileSystem(); + dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); + dfs.saveNamespace(); + dfs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE); + cluster.restartNameNode(true); + fsdir = cluster.getNamesystem().getFSDirectory(); + fileNode = fsdir.getINode4Write(file.toString()).asFile(); + blocks = fileNode.getBlocks(); + assertEquals(1, blocks.length); + Assert.assertTrue(blocks[0].isStriped()); + checkStripedBlockUC((BlockInfoStriped) fileNode.getLastBlock(), false); + } finally { + IOUtils.cleanup(null, out); + } + } + + private void checkStripedBlockUC(BlockInfoStriped block, + boolean checkReplica) { + assertEquals(0, block.numNodes()); + Assert.assertFalse(block.isComplete()); + Assert.assertEquals(HdfsConstants.NUM_DATA_BLOCKS, block.getDataBlockNum()); + Assert.assertEquals(HdfsConstants.NUM_PARITY_BLOCKS, + block.getParityBlockNum()); + Assert.assertEquals(0, + block.getBlockId() & HdfsServerConstants.BLOCK_GROUP_INDEX_MASK); + + final BlockInfoStripedUnderConstruction blockUC = + (BlockInfoStripedUnderConstruction) block; + Assert.assertEquals(HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, + blockUC.getBlockUCState()); + if (checkReplica) { + Assert.assertEquals(GROUP_SIZE, blockUC.getNumExpectedLocations()); + DatanodeStorageInfo[] storages = blockUC.getExpectedStorageLocations(); + for (DataNode dn : cluster.getDataNodes()) { + Assert.assertTrue(includeDataNode(dn.getDatanodeId(), storages)); + } + } + } + + private boolean includeDataNode(DatanodeID dn, DatanodeStorageInfo[] storages) { + for (DatanodeStorageInfo storage : storages) { + if (storage.getDatanodeDescriptor().equals(dn)) { + return true; + } + } + return false; + } + + @Test + public void testGetLocatedStripedBlocks() throws Exception { + final Path file = new Path("/file1"); + // create an empty file + FSDataOutputStream out = null; + try { + out = dfs.create(file, (short) 1); + writeAndFlushStripedOutputStream( + (DFSStripedOutputStream) out.getWrappedStream(), + DFS_BYTES_PER_CHECKSUM_DEFAULT); + + FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); + INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); + BlockInfoStripedUnderConstruction lastBlk = + (BlockInfoStripedUnderConstruction) fileNode.getLastBlock(); + DatanodeInfo[] expectedDNs = DatanodeStorageInfo + .toDatanodeInfos(lastBlk.getExpectedStorageLocations()); + int[] indices = lastBlk.getBlockIndices(); + + LocatedBlocks blks = dfs.getClient().getLocatedBlocks(file.toString(), 0L); + Assert.assertEquals(1, blks.locatedBlockCount()); + LocatedBlock lblk = blks.get(0); + + Assert.assertTrue(lblk instanceof LocatedStripedBlock); + DatanodeInfo[] datanodes = lblk.getLocations(); + int[] blockIndices = ((LocatedStripedBlock) lblk).getBlockIndices(); + Assert.assertEquals(GROUP_SIZE, datanodes.length); + Assert.assertEquals(GROUP_SIZE, blockIndices.length); + Assert.assertArrayEquals(indices, blockIndices); + Assert.assertArrayEquals(expectedDNs, datanodes); + } finally { + IOUtils.cleanup(null, out); + } + } + + /** + * Test BlockInfoStripedUnderConstruction#addReplicaIfNotPresent in different + * scenarios. + */ + @Test + public void testAddUCReplica() throws Exception { + final Path file = new Path("/file1"); + final List<String> storageIDs = new ArrayList<>(); + // create an empty file + FSDataOutputStream out = null; + try { + out = dfs.create(file, (short) 1); + + // 1. create the UC striped block + FSDirectory fsdir = cluster.getNamesystem().getFSDirectory(); + INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile(); + cluster.getNamesystem().getAdditionalBlock(file.toString(), + fileNode.getId(), dfs.getClient().getClientName(), null, null, null); + BlockInfo lastBlock = fileNode.getLastBlock(); + BlockInfoStripedUnderConstruction ucBlock = + (BlockInfoStripedUnderConstruction) lastBlock; + + DatanodeStorageInfo[] locs = ucBlock.getExpectedStorageLocations(); + int[] indices = ucBlock.getBlockIndices(); + Assert.assertEquals(GROUP_SIZE, locs.length); + Assert.assertEquals(GROUP_SIZE, indices.length); + + // 2. mimic incremental block reports and make sure the uc-replica list in + // the BlockInfoUCStriped is correct + int i = 0; + for (DataNode dn : cluster.getDataNodes()) { + final Block block = new Block(lastBlock.getBlockId() + i++, + 0, lastBlock.getGenerationStamp()); + DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); + storageIDs.add(storage.getStorageID()); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil + .makeReportForReceivedBlock(block, BlockStatus.RECEIVING_BLOCK, + storage); + for (StorageReceivedDeletedBlocks report : reports) { + cluster.getNamesystem().processIncrementalBlockReport( + dn.getDatanodeId(), report); + } + } + + // make sure lastBlock is correct and the storages have been updated + locs = ucBlock.getExpectedStorageLocations(); + indices = ucBlock.getBlockIndices(); + Assert.assertEquals(GROUP_SIZE, locs.length); + Assert.assertEquals(GROUP_SIZE, indices.length); + for (DatanodeStorageInfo newstorage : locs) { + Assert.assertTrue(storageIDs.contains(newstorage.getStorageID())); + } + } finally { + IOUtils.cleanup(null, out); + } + + // 3. restart the namenode. mimic the full block reports and check the + // uc-replica list again + cluster.restartNameNode(true); + final String bpId = cluster.getNamesystem().getBlockPoolId(); + INodeFile fileNode = cluster.getNamesystem().getFSDirectory() + .getINode4Write(file.toString()).asFile(); + BlockInfo lastBlock = fileNode.getLastBlock(); + int i = GROUP_SIZE - 1; + for (DataNode dn : cluster.getDataNodes()) { + String storageID = storageIDs.get(i); + final Block block = new Block(lastBlock.getBlockId() + i--, + lastBlock.getGenerationStamp(), 0); + DatanodeStorage storage = new DatanodeStorage(storageID); + List<ReplicaBeingWritten> blocks = new ArrayList<>(); + ReplicaBeingWritten replica = new ReplicaBeingWritten(block, null, null, + null); + blocks.add(replica); + BlockListAsLongs bll = BlockListAsLongs.encode(blocks); + StorageBlockReport[] reports = {new StorageBlockReport(storage, + bll)}; + cluster.getNameNodeRpc().blockReport(dn.getDNRegistrationForBP(bpId), + bpId, reports, null); + } + + BlockInfoStripedUnderConstruction ucBlock = + (BlockInfoStripedUnderConstruction) lastBlock; + DatanodeStorageInfo[] locs = ucBlock.getExpectedStorageLocations(); + int[] indices = ucBlock.getBlockIndices(); + Assert.assertEquals(GROUP_SIZE, locs.length); + Assert.assertEquals(GROUP_SIZE, indices.length); + for (i = 0; i < GROUP_SIZE; i++) { + Assert.assertEquals(storageIDs.get(i), + locs[GROUP_SIZE - 1 - i].getStorageID()); + Assert.assertEquals(GROUP_SIZE - i - 1, indices[i]); + } + } + + @Test + public void testCheckStripedReplicaCorrupt() throws Exception { + final int numBlocks = 4; + final int numStripes = 4; + final Path filePath = new Path("/corrupt"); + final FSNamesystem ns = cluster.getNameNode().getNamesystem(); + final BlockManager bm = ns.getBlockManager(); + DFSTestUtil.createStripedFile(cluster, filePath, null, + numBlocks, numStripes, false); + + INodeFile fileNode = ns.getFSDirectory().getINode(filePath.toString()). + asFile(); + Assert.assertTrue(fileNode.isStriped()); + BlockInfo stored = fileNode.getBlocks()[0]; + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(0, ns.getCorruptReplicaBlocks()); + + // Now send a block report with correct size + DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString()); + final Block reported = new Block(stored); + reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE); + StorageReceivedDeletedBlocks[] reports = DFSTestUtil + .makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(0).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(0, ns.getCorruptReplicaBlocks()); + + // Now send a block report with wrong size + reported.setBlockId(stored.getBlockId() + 1); + reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE - 1); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(1).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + + // Now send a parity block report with correct size + reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS); + reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(2).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + + // Now send a parity block report with wrong size + reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS); + reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE + 1); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(3).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + // the total number of corrupted block info is still 1 + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + // 2 internal blocks corrupted + Assert.assertEquals(2, bm.getCorruptReplicas(stored).size()); + + // Now change the size of stored block, and test verifying the last + // block size + stored.setNumBytes(stored.getNumBytes() + 10); + reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS + 2); + reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(4).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + Assert.assertEquals(3, bm.getCorruptReplicas(stored).size()); + + // Now send a parity block report with correct size based on adjusted + // size of stored block + /** Now stored block has {@link numStripes} full stripes + a cell + 10 */ + stored.setNumBytes(stored.getNumBytes() + BLOCK_STRIPED_CELL_SIZE); + reported.setBlockId(stored.getBlockId()); + reported.setNumBytes((numStripes + 1) * BLOCK_STRIPED_CELL_SIZE); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(0).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + Assert.assertEquals(3, bm.getCorruptReplicas(stored).size()); + + reported.setBlockId(stored.getBlockId() + 1); + reported.setNumBytes(numStripes * BLOCK_STRIPED_CELL_SIZE + 10); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(5).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + Assert.assertEquals(3, bm.getCorruptReplicas(stored).size()); + + reported.setBlockId(stored.getBlockId() + NUM_DATA_BLOCKS); + reported.setNumBytes((numStripes + 1) * BLOCK_STRIPED_CELL_SIZE); + reports = DFSTestUtil.makeReportForReceivedBlock(reported, + ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage); + ns.processIncrementalBlockReport( + cluster.getDataNodes().get(2).getDatanodeId(), reports[0]); + BlockManagerTestUtil.updateState(ns.getBlockManager()); + Assert.assertEquals(1, ns.getCorruptReplicaBlocks()); + Assert.assertEquals(3, bm.getCorruptReplicas(stored).size()); + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index c5262d4..ec3d924 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -162,7 +162,8 @@ public class TestDeadDatanode { // choose the targets, but local node should not get selected as this is not // part of the cluster anymore DatanodeStorageInfo[] results = bm.chooseTarget4NewBlock("/hello", 3, - clientNode, new HashSet<Node>(), 256 * 1024 * 1024L, null, (byte) 7); + clientNode, new HashSet<Node>(), 256 * 1024 * 1024L, null, (byte) 7, + false); for (DatanodeStorageInfo datanodeStorageInfo : results) { assertFalse("Dead node should not be choosen", datanodeStorageInfo .getDatanodeDescriptor().equals(clientNode)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index 55ba379..6426b23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; @@ -39,15 +40,24 @@ import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.test.PathUtils; import org.apache.log4j.Level; import org.junit.Test; @@ -65,6 +75,9 @@ public class TestFSEditLogLoader { private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class); private static final int NUM_DATA_NODES = 0; + + private static final ECSchema testSchema + = ErasureCodingSchemaManager.getSystemDefaultSchema(); @Test public void testDisplayRecentEditLogOpCodes() throws IOException { @@ -414,4 +427,264 @@ public class TestFSEditLogLoader { fromByte(code), FSEditLogOpCodes.fromByte(code)); } } + + @Test + public void testAddNewStripedBlock() throws IOException{ + // start a cluster + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9) + .build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + FSNamesystem fns = cluster.getNamesystem(); + + String testDir = "/ec"; + String testFile = "testfile_001"; + String testFilePath = testDir + "/" + testFile; + String clientName = "testUser1"; + String clientMachine = "testMachine1"; + long blkId = 1; + long blkNumBytes = 1024; + long timestamp = 1426222918; + short blockNum = HdfsConstants.NUM_DATA_BLOCKS; + short parityNum = HdfsConstants.NUM_PARITY_BLOCKS; + int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + + //set the storage policy of the directory + fs.mkdir(new Path(testDir), new FsPermission("755")); + fs.getClient().getNamenode().createErasureCodingZone(testDir, null, 0); + + // Create a file with striped block + Path p = new Path(testFilePath); + DFSTestUtil.createFile(fs, p, 0, (short) 1, 1); + + fns.enterSafeMode(false); + fns.saveNamespace(0, 0); + fns.leaveSafeMode(); + + // Add a striped block to the file + BlockInfoStriped stripedBlk = new BlockInfoStriped( + new Block(blkId, blkNumBytes, timestamp), testSchema, cellSize); + INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); + file.toUnderConstruction(clientName, clientMachine); + file.addBlock(stripedBlk); + fns.getEditLog().logAddBlock(testFilePath, file); + file.toCompleteFile(System.currentTimeMillis()); + + //If the block by loaded is the same as above it means that + //we have successfully applied the edit log to the fsimage. + cluster.restartNameNodes(); + cluster.waitActive(); + fns = cluster.getNamesystem(); + + INodeFile inodeLoaded = (INodeFile)fns.getFSDirectory() + .getINode(testFilePath); + + assertTrue(inodeLoaded.isStriped()); + + BlockInfo[] blks = inodeLoaded.getBlocks(); + assertEquals(1, blks.length); + assertEquals(blkId, blks[0].getBlockId()); + assertEquals(blkNumBytes, blks[0].getNumBytes()); + assertEquals(timestamp, blks[0].getGenerationStamp()); + assertEquals(blockNum, ((BlockInfoStriped)blks[0]).getDataBlockNum()); + assertEquals(parityNum, ((BlockInfoStriped)blks[0]).getParityBlockNum()); + assertEquals(cellSize, ((BlockInfoStriped)blks[0]).getCellSize()); + + cluster.shutdown(); + cluster = null; + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testUpdateStripedBlocks() throws IOException{ + // start a cluster + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9) + .build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + FSNamesystem fns = cluster.getNamesystem(); + + String testDir = "/ec"; + String testFile = "testfile_002"; + String testFilePath = testDir + "/" + testFile; + String clientName = "testUser2"; + String clientMachine = "testMachine2"; + long blkId = 1; + long blkNumBytes = 1024; + long timestamp = 1426222918; + short blockNum = HdfsConstants.NUM_DATA_BLOCKS; + short parityNum = HdfsConstants.NUM_PARITY_BLOCKS; + int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + + //set the storage policy of the directory + fs.mkdir(new Path(testDir), new FsPermission("755")); + fs.getClient().getNamenode().createErasureCodingZone(testDir, null, 0); + + //create a file with striped blocks + Path p = new Path(testFilePath); + DFSTestUtil.createFile(fs, p, 0, (short) 1, 1); + BlockInfoStriped stripedBlk = new BlockInfoStriped( + new Block(blkId, blkNumBytes, timestamp), testSchema, cellSize); + INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); + file.toUnderConstruction(clientName, clientMachine); + file.addBlock(stripedBlk); + fns.getEditLog().logAddBlock(testFilePath, file); + file.toCompleteFile(System.currentTimeMillis()); + fns.enterSafeMode(false); + fns.saveNamespace(0, 0); + fns.leaveSafeMode(); + + //update the last block + long newBlkNumBytes = 1024*8; + long newTimestamp = 1426222918+3600; + file.toUnderConstruction(clientName, clientMachine); + file.getLastBlock().setNumBytes(newBlkNumBytes); + file.getLastBlock().setGenerationStamp(newTimestamp); + fns.getEditLog().logUpdateBlocks(testFilePath, file, true); + file.toCompleteFile(System.currentTimeMillis()); + + //After the namenode restarts if the block by loaded is the same as above + //(new block size and timestamp) it means that we have successfully + //applied the edit log to the fsimage. + cluster.restartNameNodes(); + cluster.waitActive(); + fns = cluster.getNamesystem(); + + INodeFile inodeLoaded = (INodeFile)fns.getFSDirectory() + .getINode(testFilePath); + + assertTrue(inodeLoaded.isStriped()); + + BlockInfo[] blks = inodeLoaded.getBlocks(); + assertEquals(1, blks.length); + assertTrue(blks[0].isStriped()); + assertEquals(blkId, blks[0].getBlockId()); + assertEquals(newBlkNumBytes, blks[0].getNumBytes()); + assertEquals(newTimestamp, blks[0].getGenerationStamp()); + assertEquals(blockNum, ((BlockInfoStriped)blks[0]).getDataBlockNum()); + assertEquals(parityNum, ((BlockInfoStriped)blks[0]).getParityBlockNum()); + assertEquals(cellSize, ((BlockInfoStriped)blks[0]).getCellSize()); + + cluster.shutdown(); + cluster = null; + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testHasNonEcBlockUsingStripedIDForAddBlock() throws IOException{ + // start a cluster + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9) + .build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + FSNamesystem fns = cluster.getNamesystem(); + + String testDir = "/test_block_manager"; + String testFile = "testfile_addblock"; + String testFilePath = testDir + "/" + testFile; + String clientName = "testUser_addblock"; + String clientMachine = "testMachine_addblock"; + long blkId = -1; + long blkNumBytes = 1024; + long timestamp = 1426222918; + + fs.mkdir(new Path(testDir), new FsPermission("755")); + Path p = new Path(testFilePath); + + //check whether the hasNonEcBlockUsingStripedID is set + //after loading a addblock-editlog + DFSTestUtil.createFile(fs, p, 0, (short) 1, 1); + BlockInfoContiguous cBlk = new BlockInfoContiguous( + new Block(blkId, blkNumBytes, timestamp), (short)3); + INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); + file.toUnderConstruction(clientName, clientMachine); + file.addBlock(cBlk); + fns.getEditLog().logAddBlock(testFilePath, file); + file.toCompleteFile(System.currentTimeMillis()); + cluster.restartNameNodes(); + cluster.waitActive(); + fns = cluster.getNamesystem(); + assertTrue(fns.getBlockManager().hasNonEcBlockUsingStripedID()); + + cluster.shutdown(); + cluster = null; + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testHasNonEcBlockUsingStripedIDForUpdateBlocks() + throws IOException{ + // start a cluster + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9) + .build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + FSNamesystem fns = cluster.getNamesystem(); + + String testDir = "/test_block_manager"; + String testFile = "testfile_002"; + String testFilePath = testDir + "/" + testFile; + String clientName = "testUser2"; + String clientMachine = "testMachine1"; + long blkId = 100; + long blkNumBytes = 1024; + long timestamp = 1426222918; + + fs.mkdir(new Path(testDir), new FsPermission("755")); + Path p = new Path(testFilePath); + + DFSTestUtil.createFile(fs, p, 0, (short) 1, 1); + BlockInfoContiguous cBlk = new BlockInfoContiguous( + new Block(blkId, blkNumBytes, timestamp), (short)3); + INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); + file.toUnderConstruction(clientName, clientMachine); + file.addBlock(cBlk); + file.toCompleteFile(System.currentTimeMillis()); + + long newBlkNumBytes = 1024*8; + long newTimestamp = 1426222918+3600; + file.toUnderConstruction(clientName, clientMachine); + file.getLastBlock().setBlockId(-100); + file.getLastBlock().setNumBytes(newBlkNumBytes); + file.getLastBlock().setGenerationStamp(newTimestamp); + fns.getEditLog().logUpdateBlocks(testFilePath, file, true); + file.toCompleteFile(System.currentTimeMillis()); + cluster.restartNameNodes(); + cluster.waitActive(); + fns = cluster.getNamesystem(); + assertTrue(fns.getBlockManager().hasNonEcBlockUsingStripedID()); + + cluster.shutdown(); + cluster = null; + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java index df20fd6..350ef1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java @@ -22,11 +22,25 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.EnumSet; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.junit.Assert; +import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -34,6 +48,7 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -42,7 +57,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.util.MD5FileUtils; @@ -50,10 +64,16 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; import org.junit.Test; +import static org.junit.Assert.assertArrayEquals; + public class TestFSImage { private static final String HADOOP_2_7_ZER0_BLOCK_SIZE_TGZ = "image-with-zero-block-size.tar.gz"; + private static final ECSchema testSchema + = ErasureCodingSchemaManager.getSystemDefaultSchema(); + private static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + @Test public void testPersist() throws IOException { Configuration conf = new Configuration(); @@ -159,6 +179,123 @@ public class TestFSImage { } } + private void testSaveAndLoadStripedINodeFile(FSNamesystem fsn, Configuration conf, + boolean isUC) throws IOException{ + // contruct a INode with StripedBlock for saving and loading + fsn.createErasureCodingZone("/", null, 0, false); + long id = 123456789; + byte[] name = "testSaveAndLoadInodeFile_testfile".getBytes(); + PermissionStatus permissionStatus = new PermissionStatus("testuser_a", + "testuser_groups", new FsPermission((short)0x755)); + long mtime = 1426222916-3600; + long atime = 1426222916; + BlockInfoContiguous[] blks = new BlockInfoContiguous[0]; + short replication = 3; + long preferredBlockSize = 128*1024*1024; + INodeFile file = new INodeFile(id, name, permissionStatus, mtime, atime, + blks, replication, preferredBlockSize, (byte) 0, true); + ByteArrayOutputStream bs = new ByteArrayOutputStream(); + + //construct StripedBlocks for the INode + BlockInfoStriped[] stripedBlks = new BlockInfoStriped[3]; + long stripedBlkId = 10000001; + long timestamp = mtime+3600; + for (int i = 0; i < stripedBlks.length; i++) { + stripedBlks[i] = new BlockInfoStriped( + new Block(stripedBlkId + i, preferredBlockSize, timestamp), + testSchema, cellSize); + file.addBlock(stripedBlks[i]); + } + + final String client = "testClient"; + final String clientMachine = "testClientMachine"; + final String path = "testUnderConstructionPath"; + + //save the INode to byte array + DataOutput out = new DataOutputStream(bs); + if (isUC) { + file.toUnderConstruction(client, clientMachine); + FSImageSerialization.writeINodeUnderConstruction((DataOutputStream) out, + file, path); + } else { + FSImageSerialization.writeINodeFile(file, out, false); + } + DataInput in = new DataInputStream( + new ByteArrayInputStream(bs.toByteArray())); + + // load the INode from the byte array + INodeFile fileByLoaded; + if (isUC) { + fileByLoaded = FSImageSerialization.readINodeUnderConstruction(in, + fsn, fsn.getFSImage().getLayoutVersion()); + } else { + fileByLoaded = (INodeFile) new FSImageFormat.Loader(conf, fsn) + .loadINodeWithLocalName(false, in, false); + } + + assertEquals(id, fileByLoaded.getId() ); + assertArrayEquals(isUC ? path.getBytes() : name, + fileByLoaded.getLocalName().getBytes()); + assertEquals(permissionStatus.getUserName(), + fileByLoaded.getPermissionStatus().getUserName()); + assertEquals(permissionStatus.getGroupName(), + fileByLoaded.getPermissionStatus().getGroupName()); + assertEquals(permissionStatus.getPermission(), + fileByLoaded.getPermissionStatus().getPermission()); + assertEquals(mtime, fileByLoaded.getModificationTime()); + assertEquals(isUC ? mtime : atime, fileByLoaded.getAccessTime()); + // TODO for striped blocks, we currently save and load them as contiguous + // blocks to/from legacy fsimage + assertEquals(3, fileByLoaded.getBlocks().length); + assertEquals(preferredBlockSize, fileByLoaded.getPreferredBlockSize()); + + if (isUC) { + assertEquals(client, + fileByLoaded.getFileUnderConstructionFeature().getClientName()); + assertEquals(clientMachine, + fileByLoaded.getFileUnderConstructionFeature().getClientMachine()); + } + } + + /** + * Test if a INodeFile with BlockInfoStriped can be saved by + * FSImageSerialization and loaded by FSImageFormat#Loader. + */ + @Test + public void testSaveAndLoadStripedINodeFile() throws IOException{ + Configuration conf = new Configuration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + testSaveAndLoadStripedINodeFile(cluster.getNamesystem(), conf, false); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test if a INodeFileUnderConstruction with BlockInfoStriped can be + * saved and loaded by FSImageSerialization + */ + @Test + public void testSaveAndLoadStripedINodeFileUC() throws IOException{ + // construct a INode with StripedBlock for saving and loading + Configuration conf = new Configuration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + testSaveAndLoadStripedINodeFile(cluster.getNamesystem(), conf, true); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** * Ensure that the digest written by the saver equals to the digest of the * file. @@ -260,8 +397,7 @@ public class TestFSImage { .format(false) .manageDataDfsDirs(false) .manageNameDfsDirs(false) - .waitSafeMode(false) - .startupOption(StartupOption.UPGRADE) + .waitSafeMode(false).startupOption(StartupOption.UPGRADE) .build(); try { FileSystem fs = cluster.getFileSystem(); @@ -274,4 +410,207 @@ public class TestFSImage { FileUtil.fullyDelete(dfsDir); } } + + /** + * Ensure that FSImage supports BlockGroup. + */ + @Test + public void testSupportBlockGroup() throws IOException { + final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS + + HdfsConstants.NUM_PARITY_BLOCKS; + final int BLOCK_SIZE = 8 * 1024 * 1024; + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE) + .build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + fs.getClient().getNamenode().createErasureCodingZone("/", null, 0); + Path file = new Path("/striped"); + FSDataOutputStream out = fs.create(file); + byte[] bytes = DFSTestUtil.generateSequentialBytes(0, BLOCK_SIZE); + out.write(bytes); + out.close(); + + fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); + fs.saveNamespace(); + fs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE); + + cluster.restartNameNodes(); + fs = cluster.getFileSystem(); + assertTrue(fs.exists(file)); + + // check the information of striped blocks + FSNamesystem fsn = cluster.getNamesystem(); + INodeFile inode = fsn.dir.getINode(file.toString()).asFile(); + assertTrue(inode.isStriped()); + BlockInfo[] blks = inode.getBlocks(); + assertEquals(1, blks.length); + assertTrue(blks[0].isStriped()); + assertEquals(HdfsConstants.NUM_DATA_BLOCKS, ((BlockInfoStriped)blks[0]).getDataBlockNum()); + assertEquals(HdfsConstants.NUM_PARITY_BLOCKS, ((BlockInfoStriped)blks[0]).getParityBlockNum()); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testHasNonEcBlockUsingStripedIDForLoadFile() throws IOException{ + // start a cluster + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9) + .build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + FSNamesystem fns = cluster.getNamesystem(); + + String testDir = "/test_block_manager"; + String testFile = "testfile_loadfile"; + String testFilePath = testDir + "/" + testFile; + String clientName = "testUser_loadfile"; + String clientMachine = "testMachine_loadfile"; + long blkId = -1; + long blkNumBytes = 1024; + long timestamp = 1426222918; + + fs.mkdir(new Path(testDir), new FsPermission("755")); + Path p = new Path(testFilePath); + + DFSTestUtil.createFile(fs, p, 0, (short) 1, 1); + BlockInfoContiguous cBlk = new BlockInfoContiguous( + new Block(blkId, blkNumBytes, timestamp), (short)3); + INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); + file.toUnderConstruction(clientName, clientMachine); + file.addBlock(cBlk); + file.toCompleteFile(System.currentTimeMillis()); + fns.enterSafeMode(false); + fns.saveNamespace(0, 0); + cluster.restartNameNodes(); + cluster.waitActive(); + fns = cluster.getNamesystem(); + assertTrue(fns.getBlockManager().hasNonEcBlockUsingStripedID()); + + //after nonEcBlockUsingStripedID is deleted + //the hasNonEcBlockUsingStripedID is set to false + fs = cluster.getFileSystem(); + fs.delete(p,false); + fns.enterSafeMode(false); + fns.saveNamespace(0, 0); + cluster.restartNameNodes(); + cluster.waitActive(); + fns = cluster.getNamesystem(); + assertFalse(fns.getBlockManager().hasNonEcBlockUsingStripedID()); + + cluster.shutdown(); + cluster = null; + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testHasNonEcBlockUsingStripedIDForLoadUCFile() + throws IOException{ + // start a cluster + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9) + .build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + FSNamesystem fns = cluster.getNamesystem(); + + String testDir = "/test_block_manager"; + String testFile = "testfile_loaducfile"; + String testFilePath = testDir + "/" + testFile; + String clientName = "testUser_loaducfile"; + String clientMachine = "testMachine_loaducfile"; + long blkId = -1; + long blkNumBytes = 1024; + long timestamp = 1426222918; + + fs.mkdir(new Path(testDir), new FsPermission("755")); + Path p = new Path(testFilePath); + + DFSTestUtil.createFile(fs, p, 0, (short) 1, 1); + BlockInfoContiguous cBlk = new BlockInfoContiguous( + new Block(blkId, blkNumBytes, timestamp), (short)3); + INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); + file.toUnderConstruction(clientName, clientMachine); + file.addBlock(cBlk); + fns.enterSafeMode(false); + fns.saveNamespace(0, 0); + cluster.restartNameNodes(); + cluster.waitActive(); + fns = cluster.getNamesystem(); + assertTrue(fns.getBlockManager().hasNonEcBlockUsingStripedID()); + + cluster.shutdown(); + cluster = null; + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + @Test + public void testHasNonEcBlockUsingStripedIDForLoadSnapshot() + throws IOException{ + // start a cluster + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9) + .build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + FSNamesystem fns = cluster.getNamesystem(); + + String testDir = "/test_block_manager"; + String testFile = "testfile_loadSnapshot"; + String testFilePath = testDir + "/" + testFile; + String clientName = "testUser_loadSnapshot"; + String clientMachine = "testMachine_loadSnapshot"; + long blkId = -1; + long blkNumBytes = 1024; + long timestamp = 1426222918; + + Path d = new Path(testDir); + fs.mkdir(d, new FsPermission("755")); + fs.allowSnapshot(d); + + Path p = new Path(testFilePath); + DFSTestUtil.createFile(fs, p, 0, (short) 1, 1); + BlockInfoContiguous cBlk = new BlockInfoContiguous( + new Block(blkId, blkNumBytes, timestamp), (short)3); + INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); + file.toUnderConstruction(clientName, clientMachine); + file.addBlock(cBlk); + file.toCompleteFile(System.currentTimeMillis()); + + fs.createSnapshot(d,"testHasNonEcBlockUsingStripeID"); + fs.truncate(p,0); + fns.enterSafeMode(false); + fns.saveNamespace(0, 0); + cluster.restartNameNodes(); + cluster.waitActive(); + fns = cluster.getNamesystem(); + assertTrue(fns.getBlockManager().hasNonEcBlockUsingStripedID()); + + cluster.shutdown(); + cluster = null; + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java index 767f4de..f6aae22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFileTruncate.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; @@ -1032,7 +1033,8 @@ public class TestFileTruncate { iip = fsn.getFSDirectory().getINodesInPath(src, true); file = iip.getLastINode().asFile(); file.recordModification(iip.getLatestSnapshotId(), true); - assertThat(file.isBlockInLatestSnapshot(file.getLastBlock()), is(true)); + assertThat(file.isBlockInLatestSnapshot( + (BlockInfoContiguous) file.getLastBlock()), is(true)); initialGenStamp = file.getLastBlock().getGenerationStamp(); // Test that prepareFileForTruncate sets up copy-on-write truncate fsn.writeLock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java index 8818f17..04e83a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java @@ -84,6 +84,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.Result; +import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ReplicationResult; +import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck.ErasureCodingResult; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.tools.DFSck; import org.apache.hadoop.io.IOUtils; @@ -1058,13 +1060,14 @@ public class TestFsck { final HdfsFileStatus file = namenode.getRpcServer().getFileInfo(pathString); assertNotNull(file); - Result res = new Result(conf); - fsck.check(pathString, file, res); + Result replRes = new ReplicationResult(conf); + Result ecRes = new ErasureCodingResult(conf); + fsck.check(pathString, file, replRes, ecRes); // Also print the output from the fsck, for ex post facto sanity checks System.out.println(result.toString()); - assertEquals(res.missingReplicas, + assertEquals(replRes.missingReplicas, (NUM_BLOCKS*REPL_FACTOR) - (NUM_BLOCKS*NUM_REPLICAS)); - assertEquals(res.numExpectedReplicas, NUM_BLOCKS*REPL_FACTOR); + assertEquals(replRes.numExpectedReplicas, NUM_BLOCKS*REPL_FACTOR); } finally { if(dfs != null) { dfs.close(); @@ -1135,10 +1138,11 @@ public class TestFsck { final HdfsFileStatus file = namenode.getRpcServer().getFileInfo(pathString); assertNotNull(file); - Result res = new Result(conf); - fsck.check(pathString, file, res); + Result replRes = new ReplicationResult(conf); + Result ecRes = new ErasureCodingResult(conf); + fsck.check(pathString, file, replRes, ecRes); // check misReplicatedBlock number. - assertEquals(res.numMisReplicatedBlocks, NUM_BLOCKS); + assertEquals(replRes.numMisReplicatedBlocks, NUM_BLOCKS); } finally { if(dfs != null) { dfs.close(); @@ -1198,15 +1202,16 @@ public class TestFsck { HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication, blockSize, modTime, accessTime, perms, owner, group, symlink, - path, fileId, numChildren, null, storagePolicy); - Result res = new Result(conf); + path, fileId, numChildren, null, storagePolicy, null, 0); + Result replRes = new ReplicationResult(conf); + Result ecRes = new ErasureCodingResult(conf); try { - fsck.check(pathString, file, res); + fsck.check(pathString, file, replRes, ecRes); } catch (Exception e) { fail("Unexpected exception " + e.getMessage()); } - assertTrue(res.toString().contains("HEALTHY")); + assertTrue(replRes.isHealthy()); } /** Test fsck with symlinks in the filesystem */ @@ -1629,4 +1634,60 @@ public class TestFsck { } } } + + @Test + public void testECFsck() throws Exception { + MiniDFSCluster cluster = null; + FileSystem fs = null; + try { + Configuration conf = new HdfsConfiguration(); + final long precision = 1L; + conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, precision); + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); + int totalSize = ErasureCodingSchemaManager.getSystemDefaultSchema().getNumDataUnits() + + ErasureCodingSchemaManager.getSystemDefaultSchema().getNumParityUnits(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(totalSize).build(); + fs = cluster.getFileSystem(); + + // create a contiguous file + Path replDirPath = new Path("/replicated"); + Path replFilePath = new Path(replDirPath, "replfile"); + final short factor = 3; + DFSTestUtil.createFile(fs, replFilePath, 1024, factor, 0); + DFSTestUtil.waitReplication(fs, replFilePath, factor); + + // create a large striped file + Path ecDirPath = new Path("/striped"); + Path largeFilePath = new Path(ecDirPath, "largeFile"); + DFSTestUtil.createStripedFile(cluster, largeFilePath, ecDirPath, 1, 2, true); + + // create a small striped file + Path smallFilePath = new Path(ecDirPath, "smallFile"); + DFSTestUtil.writeFile(fs, smallFilePath, "hello world!"); + + long replTime = fs.getFileStatus(replFilePath).getAccessTime(); + long ecTime = fs.getFileStatus(largeFilePath).getAccessTime(); + Thread.sleep(precision); + setupAuditLogs(); + String outStr = runFsck(conf, 0, true, "/"); + verifyAuditLogs(); + assertEquals(replTime, fs.getFileStatus(replFilePath).getAccessTime()); + assertEquals(ecTime, fs.getFileStatus(largeFilePath).getAccessTime()); + System.out.println(outStr); + assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); + if (fs != null) {try{fs.close();} catch(Exception e){}} + cluster.shutdown(); + + // restart the cluster; bring up namenode but not the data nodes + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0).format(false).build(); + outStr = runFsck(conf, 1, true, "/"); + // expect the result is corrupt + assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS)); + System.out.println(outStr); + } finally { + if (fs != null) {try{fs.close();} catch(Exception e){}} + if (cluster != null) { cluster.shutdown(); } + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java index b8db998..b877181 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java @@ -94,7 +94,7 @@ public class TestINodeFile { private static INodeFile createINodeFile(byte storagePolicyID) { return new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, - null, (short)3, 1024L, storagePolicyID); + null, (short)3, 1024L, storagePolicyID, false); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestQuotaWithStripedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestQuotaWithStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestQuotaWithStripedBlocks.java new file mode 100644 index 0000000..10b77df --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestQuotaWithStripedBlocks.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.erasurecode.ECSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +/** + * Make sure we correctly update the quota usage with the striped blocks. + */ +public class TestQuotaWithStripedBlocks { + private static final int BLOCK_SIZE = 1024 * 1024; + private static final long DISK_QUOTA = BLOCK_SIZE * 10; + private static final ECSchema ecSchema = + ErasureCodingSchemaManager.getSystemDefaultSchema(); + private static final int NUM_DATA_BLOCKS = ecSchema.getNumDataUnits(); + private static final int NUM_PARITY_BLOCKS = ecSchema.getNumParityUnits(); + private static final int GROUP_SIZE = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; + private static final Path ecDir = new Path("/ec"); + + private MiniDFSCluster cluster; + private FSDirectory dir; + private DistributedFileSystem dfs; + + @Before + public void setUp() throws IOException { + final Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE).build(); + cluster.waitActive(); + + dir = cluster.getNamesystem().getFSDirectory(); + dfs = cluster.getFileSystem(); + + dfs.mkdirs(ecDir); + dfs.getClient().createErasureCodingZone(ecDir.toString(), ecSchema, 0); + dfs.setQuota(ecDir, Long.MAX_VALUE - 1, DISK_QUOTA); + dfs.setQuotaByStorageType(ecDir, StorageType.DISK, DISK_QUOTA); + dfs.setStoragePolicy(ecDir, HdfsConstants.HOT_STORAGE_POLICY_NAME); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testUpdatingQuotaCount() throws Exception { + final Path file = new Path(ecDir, "file"); + FSDataOutputStream out = null; + + try { + out = dfs.create(file, (short) 1); + + INodeFile fileNode = dir.getINode4Write(file.toString()).asFile(); + ExtendedBlock previous = null; + // Create striped blocks which have a cell in each block. + Block newBlock = DFSTestUtil.addStripedBlockToFile(cluster.getDataNodes(), + dfs, cluster.getNamesystem(), file.toString(), fileNode, + dfs.getClient().getClientName(), previous, 1); + previous = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(), + newBlock); + + final INodeDirectory dirNode = dir.getINode4Write(ecDir.toString()) + .asDirectory(); + final long spaceUsed = dirNode.getDirectoryWithQuotaFeature() + .getSpaceConsumed().getStorageSpace(); + final long diskUsed = dirNode.getDirectoryWithQuotaFeature() + .getSpaceConsumed().getTypeSpaces().get(StorageType.DISK); + // When we add a new block we update the quota using the full block size. + Assert.assertEquals(BLOCK_SIZE * GROUP_SIZE, spaceUsed); + Assert.assertEquals(BLOCK_SIZE * GROUP_SIZE, diskUsed); + + dfs.getClient().getNamenode().complete(file.toString(), + dfs.getClient().getClientName(), previous, fileNode.getId()); + + final long actualSpaceUsed = dirNode.getDirectoryWithQuotaFeature() + .getSpaceConsumed().getStorageSpace(); + final long actualDiskUsed = dirNode.getDirectoryWithQuotaFeature() + .getSpaceConsumed().getTypeSpaces().get(StorageType.DISK); + // In this case the file's real size is cell size * block group size. + Assert.assertEquals(HdfsConstants.BLOCK_STRIPED_CELL_SIZE * GROUP_SIZE, + actualSpaceUsed); + Assert.assertEquals(HdfsConstants.BLOCK_STRIPED_CELL_SIZE * GROUP_SIZE, + actualDiskUsed); + } finally { + IOUtils.cleanup(null, out); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java new file mode 100644 index 0000000..3134373 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRecoverStripedBlocks.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.junit.Test; +import java.util.List; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestRecoverStripedBlocks { + private final short GROUP_SIZE = + NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; + private MiniDFSCluster cluster; + private final Path dirPath = new Path("/dir"); + private Path filePath = new Path(dirPath, "file"); + private int maxReplicationStreams = + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT; + + private void initConf(Configuration conf) { + // Large value to make sure the pending replication request can stay in + // DatanodeDescriptor.replicateBlocks before test timeout. + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100); + // Make sure BlockManager can pull all blocks from UnderReplicatedBlocks via + // chooseUnderReplicatedBlocks at once. + conf.setInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION, 5); + } + + @Test + public void testMissingStripedBlock() throws Exception { + doTestMissingStripedBlock(1, 0); + } + + @Test + public void testMissingStripedBlockWithBusyNode1() throws Exception { + doTestMissingStripedBlock(2, 1); + } + + @Test + public void testMissingStripedBlockWithBusyNode2() throws Exception { + doTestMissingStripedBlock(3, 1); + } + + /** + * Start GROUP_SIZE + 1 datanodes. + * Inject striped blocks to first GROUP_SIZE datanodes. + * Then make numOfBusy datanodes busy, make numOfMissed datanodes missed. + * Then trigger BlockManager to compute recovery works. (so all recovery work + * will be scheduled to the last datanode) + * Finally, verify the recovery work of the last datanode. + */ + private void doTestMissingStripedBlock(int numOfMissed, int numOfBusy) + throws Exception { + Configuration conf = new HdfsConfiguration(); + initConf(conf); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 1) + .build(); + + try { + cluster.waitActive(); + final int numBlocks = 4; + DFSTestUtil.createStripedFile(cluster, filePath, + dirPath, numBlocks, 1, true); + // all blocks will be located at first GROUP_SIZE DNs, the last DN is + // empty because of the util function createStripedFile + + // make sure the file is complete in NN + final INodeFile fileNode = cluster.getNamesystem().getFSDirectory() + .getINode4Write(filePath.toString()).asFile(); + assertFalse(fileNode.isUnderConstruction()); + assertTrue(fileNode.isStriped()); + BlockInfo[] blocks = fileNode.getBlocks(); + assertEquals(numBlocks, blocks.length); + for (BlockInfo blk : blocks) { + assertTrue(blk.isStriped()); + assertTrue(blk.isComplete()); + assertEquals(BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS, + blk.getNumBytes()); + final BlockInfoStriped sb = (BlockInfoStriped) blk; + assertEquals(GROUP_SIZE, sb.numNodes()); + } + + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + BlockInfo firstBlock = fileNode.getBlocks()[0]; + DatanodeStorageInfo[] storageInfos = bm.getStorages(firstBlock); + + // make numOfBusy nodes busy + int i = 0; + for (; i < numOfBusy; i++) { + DatanodeDescriptor busyNode = storageInfos[i].getDatanodeDescriptor(); + for (int j = 0; j < maxReplicationStreams + 1; j++) { + BlockManagerTestUtil.addBlockToBeReplicated(busyNode, new Block(j), + new DatanodeStorageInfo[]{storageInfos[0]}); + } + } + + // make numOfMissed internal blocks missed + for (; i < numOfBusy + numOfMissed; i++) { + DatanodeDescriptor missedNode = storageInfos[i].getDatanodeDescriptor(); + assertEquals(numBlocks, missedNode.numBlocks()); + bm.getDatanodeManager().removeDatanode(missedNode); + } + + BlockManagerTestUtil.getComputedDatanodeWork(bm); + + // all the recovery work will be scheduled on the last DN + DataNode lastDn = cluster.getDataNodes().get(GROUP_SIZE); + DatanodeDescriptor last = + bm.getDatanodeManager().getDatanode(lastDn.getDatanodeId()); + assertEquals("Counting the number of outstanding EC tasks", numBlocks, + last.getNumberOfBlocksToBeErasureCoded()); + List<BlockECRecoveryInfo> recovery = + last.getErasureCodeCommand(numBlocks); + for (BlockECRecoveryInfo info : recovery) { + assertEquals(1, info.getTargetDnInfos().length); + assertEquals(last, info.getTargetDnInfos()[0]); + assertEquals(info.getSourceDnInfos().length, + info.getLiveBlockIndices().length); + if (GROUP_SIZE - numOfMissed == NUM_DATA_BLOCKS) { + // It's a QUEUE_HIGHEST_PRIORITY block, so the busy DNs will be chosen + // to make sure we have NUM_DATA_BLOCKS DNs to do recovery work. + assertEquals(NUM_DATA_BLOCKS, info.getSourceDnInfos().length); + } else { + // The block has no highest priority, so we don't use the busy DNs as + // sources + assertEquals(GROUP_SIZE - numOfMissed - numOfBusy, + info.getSourceDnInfos().length); + } + } + } finally { + cluster.shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStripedINodeFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStripedINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStripedINodeFile.java new file mode 100644 index 0000000..9c6a590 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStripedINodeFile.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.io.erasurecode.ECSchema; + +import org.junit.Test; + +/** + * This class tests INodeFile with striped feature. + */ +public class TestStripedINodeFile { + public static final Log LOG = LogFactory.getLog(TestINodeFile.class); + + private static final PermissionStatus perm = new PermissionStatus( + "userName", null, FsPermission.getDefault()); + + private final BlockStoragePolicySuite defaultSuite = + BlockStoragePolicySuite.createDefaultSuite(); + private final BlockStoragePolicy defaultPolicy = + defaultSuite.getDefaultPolicy(); + + private static final ECSchema testSchema + = ErasureCodingSchemaManager.getSystemDefaultSchema(); + private static final int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + + private static INodeFile createStripedINodeFile() { + return new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, + null, (short)0, 1024L, HdfsServerConstants.COLD_STORAGE_POLICY_ID, true); + } + + @Test + public void testBlockStripedFeature() + throws IOException, InterruptedException{ + INodeFile inf = createStripedINodeFile(); + assertTrue(inf.isStriped()); + } + + @Test + public void testBlockStripedTotalBlockCount() { + Block blk = new Block(1); + BlockInfoStriped blockInfoStriped + = new BlockInfoStriped(blk, testSchema, cellSize); + assertEquals(9, blockInfoStriped.getTotalBlockNum()); + } + + @Test + public void testBlockStripedLength() + throws IOException, InterruptedException { + INodeFile inf = createStripedINodeFile(); + Block blk = new Block(1); + BlockInfoStriped blockInfoStriped + = new BlockInfoStriped(blk, testSchema, cellSize); + inf.addBlock(blockInfoStriped); + assertEquals(1, inf.getBlocks().length); + } + + @Test + public void testBlockStripedConsumedSpace() + throws IOException, InterruptedException { + INodeFile inf = createStripedINodeFile(); + Block blk = new Block(1); + BlockInfoStriped blockInfoStriped + = new BlockInfoStriped(blk, testSchema, cellSize); + blockInfoStriped.setNumBytes(1); + inf.addBlock(blockInfoStriped); + // 0. Calculate the total bytes per stripes <Num Bytes per Stripes> + // 1. Calculate the number of stripes in this block group. <Num Stripes> + // 2. Calculate the last remaining length which does not make a stripe. <Last Stripe Length> + // 3. Total consumed space is the total of + // a. The total of the full cells of data blocks and parity blocks. + // b. The remaining of data block which does not make a stripe. + // c. The last parity block cells. These size should be same + // to the first cell in this stripe. + // So the total consumed space is the sum of + // a. <Cell Size> * (<Num Stripes> - 1) * <Total Block Num> = 0 + // b. <Num Bytes> % <Num Bytes per Stripes> = 1 + // c. <Last Stripe Length> * <Parity Block Num> = 1 * 3 + assertEquals(4, inf.storagespaceConsumedStriped().getStorageSpace()); + assertEquals(4, inf.storagespaceConsumed(defaultPolicy).getStorageSpace()); + } + + @Test + public void testMultipleBlockStripedConsumedSpace() + throws IOException, InterruptedException { + INodeFile inf = createStripedINodeFile(); + Block blk1 = new Block(1); + BlockInfoStriped blockInfoStriped1 + = new BlockInfoStriped(blk1, testSchema, cellSize); + blockInfoStriped1.setNumBytes(1); + Block blk2 = new Block(2); + BlockInfoStriped blockInfoStriped2 + = new BlockInfoStriped(blk2, testSchema, cellSize); + blockInfoStriped2.setNumBytes(1); + inf.addBlock(blockInfoStriped1); + inf.addBlock(blockInfoStriped2); + // This is the double size of one block in above case. + assertEquals(4 * 2, inf.storagespaceConsumedStriped().getStorageSpace()); + assertEquals(4 * 2, inf.storagespaceConsumed(defaultPolicy).getStorageSpace()); + } + + @Test + public void testBlockStripedFileSize() + throws IOException, InterruptedException { + INodeFile inf = createStripedINodeFile(); + Block blk = new Block(1); + BlockInfoStriped blockInfoStriped + = new BlockInfoStriped(blk, testSchema, cellSize); + blockInfoStriped.setNumBytes(100); + inf.addBlock(blockInfoStriped); + // Compute file size should return actual data + // size which is retained by this file. + assertEquals(100, inf.computeFileSize()); + assertEquals(100, inf.computeFileSize(false, false)); + } + + @Test + public void testBlockUCStripedFileSize() + throws IOException, InterruptedException { + INodeFile inf = createStripedINodeFile(); + Block blk = new Block(1); + BlockInfoStripedUnderConstruction bInfoUCStriped + = new BlockInfoStripedUnderConstruction(blk, testSchema, cellSize); + bInfoUCStriped.setNumBytes(100); + inf.addBlock(bInfoUCStriped); + assertEquals(100, inf.computeFileSize()); + assertEquals(0, inf.computeFileSize(false, false)); + } + + @Test + public void testBlockStripedComputeQuotaUsage() + throws IOException, InterruptedException { + INodeFile inf = createStripedINodeFile(); + Block blk = new Block(1); + BlockInfoStriped blockInfoStriped + = new BlockInfoStriped(blk, testSchema, cellSize); + blockInfoStriped.setNumBytes(100); + inf.addBlock(blockInfoStriped); + + QuotaCounts counts = + inf.computeQuotaUsageWithStriped(defaultPolicy, + new QuotaCounts.Builder().build()); + assertEquals(1, counts.getNameSpace()); + // The total consumed space is the sum of + // a. <Cell Size> * (<Num Stripes> - 1) * <Total Block Num> = 0 + // b. <Num Bytes> % <Num Bytes per Stripes> = 100 + // c. <Last Stripe Length> * <Parity Block Num> = 100 * 3 + assertEquals(400, counts.getStorageSpace()); + } + + @Test + public void testBlockUCStripedComputeQuotaUsage() + throws IOException, InterruptedException { + INodeFile inf = createStripedINodeFile(); + Block blk = new Block(1); + BlockInfoStripedUnderConstruction bInfoUCStriped + = new BlockInfoStripedUnderConstruction(blk, testSchema, cellSize); + bInfoUCStriped.setNumBytes(100); + inf.addBlock(bInfoUCStriped); + + QuotaCounts counts + = inf.computeQuotaUsageWithStriped(defaultPolicy, + new QuotaCounts.Builder().build()); + assertEquals(1024, inf.getPreferredBlockSize()); + assertEquals(1, counts.getNameSpace()); + // Consumed space in the case of BlockInfoUCStriped can be calculated + // by using preferred block size. This is 1024 and total block num + // is 9(= 3 + 6). Consumed storage space should be 1024 * 9 = 9216. + assertEquals(9216, counts.getStorageSpace()); + } + + /** + * Test the behavior of striped and contiguous block deletions. + */ + @Test(timeout = 60000) + public void testDeleteOp() throws Exception { + MiniDFSCluster cluster = null; + try { + final int len = 1024; + final Path parentDir = new Path("/parentDir"); + final Path zone = new Path(parentDir, "zone"); + final Path zoneFile = new Path(zone, "zoneFile"); + final Path contiguousFile = new Path(parentDir, "someFile"); + final DistributedFileSystem dfs; + final Configuration conf = new Configuration(); + final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS + + HdfsConstants.NUM_PARITY_BLOCKS; + conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY, 2); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE) + .build(); + cluster.waitActive(); + + FSNamesystem fsn = cluster.getNamesystem(); + dfs = cluster.getFileSystem(); + dfs.mkdirs(zone); + + // create erasure zone + dfs.createErasureCodingZone(zone, null, 0); + DFSTestUtil.createFile(dfs, zoneFile, len, (short) 1, 0xFEED); + DFSTestUtil.createFile(dfs, contiguousFile, len, (short) 1, 0xFEED); + final FSDirectory fsd = fsn.getFSDirectory(); + + // Case-1: Verify the behavior of striped blocks + // Get blocks of striped file + INode inodeStriped = fsd.getINode("/parentDir/zone/zoneFile"); + assertTrue("Failed to get INodeFile for /parentDir/zone/zoneFile", + inodeStriped instanceof INodeFile); + INodeFile inodeStripedFile = (INodeFile) inodeStriped; + BlockInfo[] stripedBlks = inodeStripedFile.getBlocks(); + for (BlockInfo blockInfo : stripedBlks) { + assertFalse("Mistakenly marked the block as deleted!", + blockInfo.isDeleted()); + } + + // delete erasure zone directory + dfs.delete(zone, true); + for (BlockInfo blockInfo : stripedBlks) { + assertTrue("Didn't mark the block as deleted!", blockInfo.isDeleted()); + } + + // Case-2: Verify the behavior of contiguous blocks + // Get blocks of contiguous file + INode inode = fsd.getINode("/parentDir/someFile"); + assertTrue("Failed to get INodeFile for /parentDir/someFile", + inode instanceof INodeFile); + INodeFile inodeFile = (INodeFile) inode; + BlockInfo[] contiguousBlks = inodeFile.getBlocks(); + for (BlockInfo blockInfo : contiguousBlks) { + assertFalse("Mistakenly marked the block as deleted!", + blockInfo.isDeleted()); + } + + // delete parent directory + dfs.delete(parentDir, true); + for (BlockInfo blockInfo : contiguousBlks) { + assertTrue("Didn't mark the block as deleted!", blockInfo.isDeleted()); + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java index a1abd08..57026cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java @@ -175,7 +175,7 @@ public class SnapshotTestHelper { * localName (className@hashCode) parent permission group user * * Specific information for different types of INode: - * {@link INodeDirectory}:childrenSize + * {@link INodeDirectory}:childrenSize * {@link INodeFile}: fileSize, block list. Check {@link BlockInfo#toString()} * and {@link BlockInfoContiguousUnderConstruction#toString()} for detailed information. * {@link FileWithSnapshot}: next link