Merge HDFS-8394 from trunk: Move getAdditionalBlock() and related functionalities into a separate class.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f3466727 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f3466727 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f3466727 Branch: refs/heads/HDFS-7285 Commit: f346672747978e92b6a70101e6bc1befe932d09b Parents: 10a7482 Author: Jing Zhao <ji...@apache.org> Authored: Sat May 16 16:57:12 2015 -0700 Committer: Jing Zhao <ji...@apache.org> Committed: Sat May 16 16:57:12 2015 -0700 ---------------------------------------------------------------------- .../blockmanagement/BlockInfoContiguous.java | 2 +- .../server/blockmanagement/BlockManager.java | 8 +- .../hdfs/server/namenode/FSDirWriteFileOp.java | 120 +++++++++++-------- .../hdfs/server/namenode/FSNamesystem.java | 6 +- .../hadoop/hdfs/server/namenode/INodeFile.java | 8 -- .../hadoop/hdfs/util/StripedBlockUtil.java | 2 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 2 +- 7 files changed, 81 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3466727/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index eeab076..416091f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@ -42,7 +42,7 @@ public class BlockInfoContiguous extends BlockInfo { * @param from BlockReplicationInfo to copy from. */ protected BlockInfoContiguous(BlockInfoContiguous from) { - this(from, from.getBlockCollection().getBlockReplication()); + this(from, from.getBlockCollection().getPreferredBlockReplication()); this.triplets = new Object[from.triplets.length]; this.setBlockCollection(from.getBlockCollection()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3466727/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 2e7855e..9cdfa05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3560,6 +3560,11 @@ public class BlockManager { return storages; } + /** @return an iterator of the datanodes. */ + public Iterable<DatanodeStorageInfo> getStorages(final Block block) { + return blocksMap.getStorages(block); + } + public int getTotalBlocks() { return blocksMap.size(); } @@ -3951,7 +3956,7 @@ public class BlockManager { null); } - public LocatedBlock newLocatedBlock(ExtendedBlock eb, BlockInfo info, + public static LocatedBlock newLocatedBlock(ExtendedBlock eb, BlockInfo info, DatanodeStorageInfo[] locs, long offset) throws IOException { final LocatedBlock lb; if (info.isStriped()) { @@ -3961,7 +3966,6 @@ public class BlockManager { } else { lb = newLocatedBlock(eb, locs, offset, false); } - setBlockToken(lb, BlockTokenIdentifier.AccessMode.WRITE); return lb; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3466727/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 1ff0899..324cc16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -26,12 +26,15 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; +import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; @@ -50,7 +53,7 @@ class FSDirWriteFileOp { Block block) throws IOException { // modify file-> block and blocksMap // fileNode should be under construction - BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block); + BlockInfoUnderConstruction uc = fileNode.removeLastBlock(block); if (uc == null) { return false; } @@ -64,7 +67,7 @@ class FSDirWriteFileOp { // update space consumed fsd.updateCount(iip, 0, -fileNode.getPreferredBlockSize(), - fileNode.getPreferredBlockReplication(), true); + fileNode.getPreferredBlockReplication(), true); return true; } @@ -144,7 +147,7 @@ class FSDirWriteFileOp { String src, long fileId, String clientName, ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException { final long blockSize; - final int replication; + final short numTargets; final byte storagePolicyID; String clientMachine; @@ -172,18 +175,21 @@ class FSDirWriteFileOp { blockSize = pendingFile.getPreferredBlockSize(); clientMachine = pendingFile.getFileUnderConstructionFeature() .getClientMachine(); - replication = pendingFile.getFileReplication(); + boolean isStriped = pendingFile.isStriped(); + numTargets = isStriped ? + HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : + pendingFile.getFileReplication(); storagePolicyID = pendingFile.getStoragePolicyID(); - return new ValidateAddBlockResult(blockSize, replication, storagePolicyID, - clientMachine); + return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID, + clientMachine); } - static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk, + static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk, DatanodeStorageInfo[] locs, long offset) throws IOException { LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk), - locs, offset, false); + blk, locs, offset); fsn.getBlockManager().setBlockToken(lBlk, - BlockTokenIdentifier.AccessMode.WRITE); + BlockTokenIdentifier.AccessMode.WRITE); return lBlk; } @@ -212,9 +218,10 @@ class FSDirWriteFileOp { return onRetryBlock[0]; } else { // add new chosen targets to already allocated block and return - BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock(); - ((BlockInfoContiguousUnderConstruction) lastBlockInFile) - .setExpectedLocations(targets); + BlockInfo lastBlockInFile = pendingFile.getLastBlock(); + final BlockInfoUnderConstruction uc + = (BlockInfoUnderConstruction)lastBlockInFile; + uc.setExpectedLocations(targets); offset = pendingFile.computeFileSize(); return makeLocatedBlock(fsn, lastBlockInFile, targets, offset); } @@ -225,15 +232,17 @@ class FSDirWriteFileOp { ExtendedBlock.getLocalBlock(previous)); // allocate new block, record block locations in INode. - Block newBlock = fsn.createNewBlock(); + final boolean isStriped = pendingFile.isStriped(); + // allocate new block, record block locations in INode. + Block newBlock = fsn.createNewBlock(isStriped); INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile); - saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets); + saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets, isStriped); persistNewBlock(fsn, src, pendingFile); offset = pendingFile.computeFileSize(); // Return located block - return makeLocatedBlock(fsn, newBlock, targets, offset); + return makeLocatedBlock(fsn, fsn.getStoredBlock(newBlock), targets, offset); } static DatanodeStorageInfo[] chooseTargetForNewBlock( @@ -254,7 +263,7 @@ class FSDirWriteFileOp { : Arrays.asList(favoredNodes); // choose targets for the new block to be allocated. - return bm.chooseTarget4NewBlock(src, r.replication, clientNode, + return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode, excludedNodesSet, r.blockSize, favoredNodesList, r.storagePolicyID); } @@ -280,25 +289,38 @@ class FSDirWriteFileOp { /** * Add a block to the file. Returns a reference to the added block. */ - private static BlockInfoContiguous addBlock( - FSDirectory fsd, String path, INodesInPath inodesInPath, Block block, - DatanodeStorageInfo[] targets) throws IOException { + private static BlockInfo addBlock(FSDirectory fsd, String path, + INodesInPath inodesInPath, Block block, DatanodeStorageInfo[] targets, + boolean isStriped) throws IOException { fsd.writeLock(); try { final INodeFile fileINode = inodesInPath.getLastINode().asFile(); Preconditions.checkState(fileINode.isUnderConstruction()); - // check quota limits and updated space consumed - fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), - fileINode.getPreferredBlockReplication(), true); - // associate new last block for the file - BlockInfoContiguousUnderConstruction blockInfo = - new BlockInfoContiguousUnderConstruction( - block, - fileINode.getFileReplication(), - HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, + final BlockInfo blockInfo; + if (isStriped) { + ECSchema ecSchema = fsd.getECSchema(inodesInPath); + short numDataUnits = (short) ecSchema.getNumDataUnits(); + short numParityUnits = (short) ecSchema.getNumParityUnits(); + short numLocations = (short) (numDataUnits + numParityUnits); + + // check quota limits and updated space consumed + fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), + numLocations, true); + blockInfo = new BlockInfoStripedUnderConstruction(block, numDataUnits, + numParityUnits, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, + targets); + } else { + // check quota limits and updated space consumed + fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), + fileINode.getPreferredBlockReplication(), true); + + short numLocations = fileINode.getFileReplication(); + blockInfo = new BlockInfoContiguousUnderConstruction(block, + numLocations, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); + } fsd.getBlockManager().addBlockCollection(blockInfo, fileINode); fileINode.addBlock(blockInfo); @@ -317,7 +339,7 @@ class FSDirWriteFileOp { private static FileState analyzeFileState( FSNamesystem fsn, String src, long fileId, String clientName, ExtendedBlock previous, LocatedBlock[] onRetryBlock) - throws IOException { + throws IOException { assert fsn.hasReadLock(); checkBlock(fsn, previous); @@ -345,9 +367,8 @@ class FSDirWriteFileOp { src = iip.getPath(); } } - final INodeFile file = fsn.checkLease(src, clientName, - inode, fileId); - BlockInfoContiguous lastBlockInFile = file.getLastBlock(); + final INodeFile file = fsn.checkLease(src, clientName, inode, fileId); + BlockInfo lastBlockInFile = file.getLastBlock(); if (!Block.matchingIdAndGenStamp(previousBlock, lastBlockInFile)) { // The block that the client claims is the current last block // doesn't match up with what we think is the last block. There are @@ -375,7 +396,7 @@ class FSDirWriteFileOp { // changed the namesystem state yet. // We run this analysis again in Part II where case 4 is impossible. - BlockInfoContiguous penultimateBlock = file.getPenultimateBlock(); + BlockInfo penultimateBlock = file.getPenultimateBlock(); if (previous == null && lastBlockInFile != null && lastBlockInFile.getNumBytes() >= file.getPreferredBlockSize() && @@ -401,8 +422,8 @@ class FSDirWriteFileOp { "allocation of a new block in " + src + ". Returning previously" + " allocated block " + lastBlockInFile); long offset = file.computeFileSize(); - BlockInfoContiguousUnderConstruction lastBlockUC = - (BlockInfoContiguousUnderConstruction) lastBlockInFile; + BlockInfoUnderConstruction lastBlockUC = + (BlockInfoUnderConstruction) lastBlockInFile; onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile, lastBlockUC.getExpectedStorageLocations(), offset); return new FileState(file, src, iip); @@ -427,14 +448,8 @@ class FSDirWriteFileOp { checkBlock(fsn, last); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); src = fsn.dir.resolvePath(pc, src, pathComponents); - boolean success = completeFileInternal(fsn, src, holder, - ExtendedBlock.getLocalBlock(last), - fileId); - if (success) { - NameNode.stateChangeLog.info("DIR* completeFile: " + srcArg - + " is closed by " + holder); - } - return success; + return completeFileInternal(fsn, src, holder, + ExtendedBlock.getLocalBlock(last), fileId); } private static boolean completeFileInternal( @@ -522,13 +537,12 @@ class FSDirWriteFileOp { * @param targets target datanodes where replicas of the new block is placed * @throws QuotaExceededException If addition of block exceeds space quota */ - private static void saveAllocatedBlock( - FSNamesystem fsn, String src, INodesInPath inodesInPath, Block newBlock, - DatanodeStorageInfo[] targets) - throws IOException { + private static void saveAllocatedBlock(FSNamesystem fsn, String src, + INodesInPath inodesInPath, Block newBlock, DatanodeStorageInfo[] targets, + boolean isStriped) throws IOException { assert fsn.hasWriteLock(); - BlockInfoContiguous b = addBlock(fsn.dir, src, inodesInPath, newBlock, - targets); + BlockInfo b = addBlock(fsn.dir, src, inodesInPath, newBlock, targets, + isStriped); NameNode.stateChangeLog.info("BLOCK* allocate " + b + " for " + src); DatanodeStorageInfo.incrementBlocksScheduled(targets); } @@ -547,15 +561,15 @@ class FSDirWriteFileOp { static class ValidateAddBlockResult { final long blockSize; - final int replication; + final int numTargets; final byte storagePolicyID; final String clientMachine; ValidateAddBlockResult( - long blockSize, int replication, byte storagePolicyID, + long blockSize, int numTargets, byte storagePolicyID, String clientMachine) { this.blockSize = blockSize; - this.replication = replication; + this.numTargets = numTargets; this.storagePolicyID = storagePolicyID; this.clientMachine = clientMachine; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3466727/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 9a27105..a8db772 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3273,6 +3273,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, writeUnlock(); } getEditLog().logSync(); + if (success) { + NameNode.stateChangeLog.info("DIR* completeFile: " + src + + " is closed by " + holder); + } return success; } @@ -3280,7 +3284,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * Create new block with a unique block id and a new generation stamp. * @param isStriped is the file under striping or contiguous layout? */ - Block createNewBlock() throws IOException { + Block createNewBlock(boolean isStriped) throws IOException { assert hasWriteLock(); Block b = new Block(nextBlockId(isStriped), 0, 0); // Increment the generation stamp for every new block. http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3466727/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 154198c..41287e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -905,14 +905,6 @@ public class INodeFile extends INodeWithAdditionalFields return counts; } - public final short getReplication(int lastSnapshotId) { - if (lastSnapshotId != CURRENT_STATE_ID) { - return getFileReplication(lastSnapshotId); - } else { - return getBlockReplication(); - } - } - /** * Return the penultimate allocated block for this file. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3466727/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index f7ae88a..c95f0b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -87,7 +87,7 @@ public class StripedBlockUtil { new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, - bg.getStartOffset() + idxInBlockGroup, bg.isCorrupt(), + bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(), null); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f3466727/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 0165189..9f106cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1915,7 +1915,7 @@ public class DFSTestUtil { fileNode.getId(), null); final BlockInfo lastBlock = fileNode.getLastBlock(); - final int groupSize = fileNode.getBlockReplication(); + final int groupSize = fileNode.getPreferredBlockReplication(); assert dataNodes.size() >= groupSize; // 1. RECEIVING_BLOCK IBR for (int i = 0; i < groupSize; i++) {