http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 36ce133,508da85..a64e50c --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@@ -42,7 -44,8 +44,9 @@@ import javax.management.ObjectName import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.StorageType; + import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; @@@ -52,12 -55,10 +56,11 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; --import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; @@@ -77,22 -79,17 +81,24 @@@ import org.apache.hadoop.hdfs.server.pr import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; + import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; + + import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.Time; @@@ -818,11 -786,12 +835,11 @@@ public class BlockManager implements Bl } return locations; } -- - private List<LocatedBlock> createLocatedBlockList( - final BlockInfo[] blocks, ++ + private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks, final long offset, final long length, final int nrBlocksToReturn, final AccessMode mode) throws IOException { - int curBlk = 0; + int curBlk; long curPos = 0, blkSize = 0; int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length; for (curBlk = 0; curBlk < nrBlocks; curBlk++) { @@@ -875,25 -844,19 +892,26 @@@ } /** @return a LocatedBlock for the given block */ - private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) { + private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos - ) throws IOException { - if (blk instanceof BlockInfoContiguousUnderConstruction) { - if (blk.isComplete()) { - throw new IOException( - "blk instanceof BlockInfoUnderConstruction && blk.isComplete()" - + ", blk=" + blk); ++ ) throws IOException { + if (!blk.isComplete()) { + if (blk.isStriped()) { - final BlockInfoUnderConstructionStriped uc = - (BlockInfoUnderConstructionStriped) blk; ++ final BlockInfoStripedUnderConstruction uc = ++ (BlockInfoStripedUnderConstruction) blk; + final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); + final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), + blk); + return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos, + false); + } else { - assert blk instanceof BlockInfoUnderConstructionContiguous; - final BlockInfoUnderConstructionContiguous uc = - (BlockInfoUnderConstructionContiguous) blk; ++ assert blk instanceof BlockInfoContiguousUnderConstruction; ++ final BlockInfoContiguousUnderConstruction uc = ++ (BlockInfoContiguousUnderConstruction) blk; + final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); + final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), + blk); + return newLocatedBlock(eb, storages, pos, false); } - final BlockInfoContiguousUnderConstruction uc = - (BlockInfoContiguousUnderConstruction) blk; - final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); - final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); - return newLocatedBlock(eb, storages, pos, false); } // get block locations @@@ -1188,17 -1121,13 +1206,17 @@@ return; } StringBuilder datanodes = new StringBuilder(); - for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock, + State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); - invalidateBlocks.add(b, node, false); - datanodes.append(node).append(" "); + final Block b = getBlockOnStorage(storedBlock, storage); + if (b != null) { + invalidateBlocks.add(b, node, false); + datanodes.append(node).append(" "); + } } if (datanodes.length() != 0) { - blockLog.info("BLOCK* addToInvalidates: {} {}", storedBlock, - blockLog.debug("BLOCK* addToInvalidates: {} {}", b, ++ blockLog.debug("BLOCK* addToInvalidates: {} {}", storedBlock, datanodes.toString()); } } @@@ -1267,8 -1188,8 +1285,8 @@@ DatanodeStorageInfo storageInfo, DatanodeDescriptor node) throws IOException { - if (b.corrupted.isDeleted()) { + if (b.stored.isDeleted()) { - blockLog.info("BLOCK markBlockAsCorrupt: {} cannot be marked as" + + blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" + " corrupt as it does not belong to any file", b); addToInvalidates(b.corrupted, node); return; @@@ -1323,9 -1237,9 +1341,9 @@@ * @return true if the block was successfully invalidated and no longer * present in the BlocksMap */ - private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn - ) throws IOException { + private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn, + NumberReplicas nr) throws IOException { - blockLog.info("BLOCK* invalidateBlock: {} on {}", b, dn); + blockLog.debug("BLOCK* invalidateBlock: {} on {}", b, dn); DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { throw new IOException("Cannot invalidate " + b @@@ -1333,8 -1247,9 +1351,8 @@@ } // Check how many copies we have of the block - NumberReplicas nr = countNodes(b.stored); if (nr.replicasOnStaleNodes() > 0) { - blockLog.info("BLOCK* invalidateBlocks: postponing " + + blockLog.debug("BLOCK* invalidateBlocks: postponing " + "invalidation of {} on {} because {} replica(s) are located on " + "nodes with potentially out-of-date block reports", b, dn, nr.replicasOnStaleNodes()); @@@ -1478,12 -1391,12 +1496,12 @@@ // do not schedule more if enough replicas is already pending numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplications.getNumReplicas(block); - + if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { + (blockHasEnoughRacks(block, requiredReplication)) ) { neededReplications.remove(block, priority); // remove from neededReplications - blockLog.info("BLOCK* Removing {} from neededReplications as" + + blockLog.debug("BLOCK* Removing {} from neededReplications as" + " it has enough replicas", block); continue; } @@@ -1565,10 -1463,10 +1583,10 @@@ if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { + (blockHasEnoughRacks(block, requiredReplication)) ) { neededReplications.remove(block, priority); // remove from neededReplications rw.targets = null; - blockLog.info("BLOCK* Removing {} from neededReplications as" + + blockLog.debug("BLOCK* Removing {} from neededReplications as" + " it has enough replicas", block); continue; } @@@ -1637,11 -1510,11 +1655,11 @@@ DatanodeStorageInfo[] targets = rw.targets; if (targets != null && targets.length != 0) { StringBuilder targetList = new StringBuilder("datanode(s)"); - for (int k = 0; k < targets.length; k++) { + for (DatanodeStorageInfo target : targets) { targetList.append(' '); - targetList.append(targets[k].getDatanodeDescriptor()); + targetList.append(target.getDatanodeDescriptor()); } - blockLog.info("BLOCK* ask {} to replicate {} to {}", rw.srcNodes, - blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNode, ++ blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNodes, rw.block, targetList); } } @@@ -1882,11 -1765,8 +1921,11 @@@ final Block reportedBlock; final ReplicaState reportedState; - StatefulBlockInfo(BlockInfoContiguousUnderConstruction storedBlock, + StatefulBlockInfo(BlockInfo storedBlock, Block reportedBlock, ReplicaState reportedState) { + Preconditions.checkArgument( - storedBlock instanceof BlockInfoUnderConstructionContiguous || - storedBlock instanceof BlockInfoUnderConstructionStriped); ++ storedBlock instanceof BlockInfoContiguousUnderConstruction || ++ storedBlock instanceof BlockInfoStripedUnderConstruction); this.storedBlock = storedBlock; this.reportedBlock = reportedBlock; this.reportedState = reportedState; @@@ -2263,8 -2141,8 +2311,8 @@@ QUEUE_REASON_FUTURE_GENSTAMP); continue; } -- - BlockInfo storedBlock = blocksMap.getStoredBlock(iblk); ++ + BlockInfo storedBlock = getStoredBlock(iblk); // If block does not belong to any file, we are done. if (storedBlock == null) continue; @@@ -2306,9 -2186,9 +2354,9 @@@ } private void reportDiff(DatanodeStorageInfo storageInfo, -- BlockListAsLongs newReport, - Collection<BlockInfo> toAdd, // add to DatanodeDescriptor - Collection<Block> toRemove, // remove from DatanodeDescriptor ++ BlockListAsLongs newReport, + Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor + Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor Collection<Block> toInvalidate, // should be removed from DN Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list Collection<StatefulBlockInfo> toUC) { // add to under-construction list @@@ -2342,10 -2220,10 +2390,11 @@@ // collect blocks that have not been reported // all of them are next to the delimiter - Iterator<BlockInfo> it = storageInfo.new BlockIterator(delimiter.getNext(0)); + Iterator<BlockInfo> it = + storageInfo.new BlockIterator(delimiter.getNext(0)); - while(it.hasNext()) + while (it.hasNext()) { toRemove.add(it.next()); + } storageInfo.removeBlock(delimiter); } @@@ -2382,8 -2260,8 +2431,8 @@@ */ private BlockInfo processReportedBlock( final DatanodeStorageInfo storageInfo, -- final Block block, final ReplicaState reportedState, - final Collection<BlockInfo> toAdd, ++ final Block block, final ReplicaState reportedState, + final Collection<BlockInfoToAdd> toAdd, final Collection<Block> toInvalidate, final Collection<BlockToMarkCorrupt> toCorrupt, final Collection<StatefulBlockInfo> toUC) { @@@ -2717,10 -2578,9 +2766,10 @@@ assert block != null && namesystem.hasWriteLock(); BlockInfo storedBlock; DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); - if (block instanceof BlockInfoUnderConstructionContiguous || - block instanceof BlockInfoUnderConstructionStriped) { - if (block instanceof BlockInfoContiguousUnderConstruction) { ++ if (block instanceof BlockInfoContiguousUnderConstruction || ++ block instanceof BlockInfoStripedUnderConstruction) { //refresh our copy in case the block got completed in another thread - storedBlock = blocksMap.getStoredBlock(block); + storedBlock = getStoredBlock(block); } else { storedBlock = block; } @@@ -3275,26 -3055,6 +3325,26 @@@ } } + private void processChosenExcessReplica( + final Collection<DatanodeStorageInfo> nonExcess, + final DatanodeStorageInfo chosen, BlockInfo storedBlock) { + nonExcess.remove(chosen); + addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock); + // + // The 'excessblocks' tracks blocks until we get confirmation + // that the datanode has deleted them; the only way we remove them + // is when we get a "removeBlock" message. + // + // The 'invalidate' list is used to inform the datanode the block + // should be deleted. Items are removed from the invalidate list + // upon giving instructions to the datanodes. + // + final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen); + addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor()); - blockLog.info("BLOCK* chooseExcessReplicates: " - +"({}, {}) is added to invalidated blocks set", chosen, storedBlock); ++ blockLog.debug("BLOCK* chooseExcessReplicates: " ++ + "({}, {}) is added to invalidated blocks set", chosen, storedBlock); + } + /** Check if we can use delHint */ static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint, DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks, @@@ -3356,6 -3116,19 +3406,19 @@@ return; } + CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks() - .get(new CachedBlock(block.getBlockId(), (short) 0, false)); ++ .get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false)); + if (cblock != null) { + boolean removed = false; + removed |= node.getPendingCached().remove(cblock); + removed |= node.getCached().remove(cblock); + removed |= node.getPendingUncached().remove(cblock); + if (removed) { + blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching " - + "related lists on node {}", block, node); ++ + "related lists on node {}", storedBlock, node); + } + } + // // It's possible that the block was removed because of a datanode // failure. If the block is still valid, check if replication is @@@ -3454,7 -3212,10 +3517,10 @@@ // // Modify the blocks->datanode map and node's map. // - pendingReplications.decrement(getStoredBlock(block), node); + BlockInfo storedBlock = getStoredBlock(block); + if (storedBlock != null) { - pendingReplications.decrement(getStoredBlock(block), node); ++ pendingReplications.decrement(storedBlock, node); + } processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); } @@@ -4138,57 -3819,22 +4204,57 @@@ null); } - private static class ReplicationWork { - - private final BlockInfo block; - private final BlockCollection bc; - - private final DatanodeDescriptor srcNode; - private final List<DatanodeDescriptor> containingNodes; - private final List<DatanodeStorageInfo> liveReplicaStorages; - private final int additionalReplRequired; + public static LocatedStripedBlock newLocatedStripedBlock( + ExtendedBlock b, DatanodeStorageInfo[] storages, + int[] indices, long startOffset, boolean corrupt) { + // startOffset is unknown + return new LocatedStripedBlock( + b, DatanodeStorageInfo.toDatanodeInfos(storages), + DatanodeStorageInfo.toStorageIDs(storages), + DatanodeStorageInfo.toStorageTypes(storages), + indices, startOffset, corrupt, + null); + } - private DatanodeStorageInfo targets[]; - private final int priority; + public static LocatedBlock newLocatedBlock(ExtendedBlock eb, BlockInfo info, + DatanodeStorageInfo[] locs, long offset) throws IOException { + final LocatedBlock lb; + if (info.isStriped()) { + lb = newLocatedStripedBlock(eb, locs, - ((BlockInfoUnderConstructionStriped)info).getBlockIndices(), ++ ((BlockInfoStripedUnderConstruction)info).getBlockIndices(), + offset, false); + } else { + lb = newLocatedBlock(eb, locs, offset, false); + } + return lb; + } - public ReplicationWork(BlockInfo block, + /** + * This class is used internally by {@link this#computeRecoveryWorkForBlocks} + * to represent a task to recover a block through replication or erasure + * coding. Recovery is done by transferring data from srcNodes to targets + */ + private abstract static class BlockRecoveryWork { + final BlockInfo block; + final BlockCollection bc; + + /** + * An erasure coding recovery task has multiple source nodes. + * A replication task only has 1 source node, stored on top of the array + */ + final DatanodeDescriptor[] srcNodes; + /** Nodes containing the block; avoid them in choosing new targets */ + final List<DatanodeDescriptor> containingNodes; + /** Required by {@link BlockPlacementPolicy#chooseTarget} */ + final List<DatanodeStorageInfo> liveReplicaStorages; + final int additionalReplRequired; + + DatanodeStorageInfo[] targets; + final int priority; + + BlockRecoveryWork(BlockInfo block, BlockCollection bc, - DatanodeDescriptor srcNode, + DatanodeDescriptor[] srcNodes, List<DatanodeDescriptor> containingNodes, List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 9173920,0dbf485..5bfae42 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@@ -130,17 -123,13 +130,17 @@@ class BlocksMap return; blockInfo.setBlockCollection(null); - for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) { + final int size = blockInfo instanceof BlockInfoContiguous ? + blockInfo.numNodes() : blockInfo.getCapacity(); + for(int idx = size - 1; idx >= 0; idx--) { DatanodeDescriptor dn = blockInfo.getDatanode(idx); - dn.removeBlock(blockInfo); // remove from the list and wipe the location + if (dn != null) { + dn.removeBlock(blockInfo); // remove from the list and wipe the location + } } } -- - /** Returns the block object it it exists in the map. */ ++ + /** Returns the block object if it exists in the map. */ BlockInfo getStoredBlock(Block b) { return blocks.get(b); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 108ce2f,87ce753..87394f6 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@@ -31,10 -31,8 +31,7 @@@ import java.util.Queue import java.util.Set; import com.google.common.annotations.VisibleForTesting; - import com.google.common.collect.ImmutableList; - - import org.apache.commons.logging.Log; - import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; @@@ -50,9 -46,10 +47,11 @@@ import org.apache.hadoop.hdfs.server.pr import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.LightWeightHashSet; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.util.IntrusiveCollection; import org.apache.hadoop.util.Time; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; /** * This class extends the DatanodeInfo class with ephemeral information (eg http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 2275d91,216d6d2..bb9a706 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@@ -253,18 -252,10 +254,18 @@@ public class DatanodeStorageInfo } // add to the head of the data-node list - b.addStorage(this); + b.addStorage(this, reportedBlock); + insertToList(b); + return result; + } + - AddBlockResult addBlock(BlockInfoContiguous b) { ++ AddBlockResult addBlock(BlockInfo b) { + return addBlock(b, b); + } + + public void insertToList(BlockInfo b) { blockList = b.listInsert(blockList, this); numBlocks++; - return result; } public boolean removeBlock(BlockInfo b) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java index 47afb05,ebc15b8..7e8f479 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java @@@ -240,10 -206,10 +241,10 @@@ class UnderReplicatedBlocks implements /** remove a block from a under replication queue */ synchronized boolean remove(BlockInfo block, - int oldReplicas, + int oldReplicas, int decommissionedReplicas, int oldExpectedReplicas) { - int priLevel = getPriority(oldReplicas, + int priLevel = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas); boolean removedBlock = remove(block, priLevel); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index dc6acd5,afacebb..34d92d0 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@@ -368,17 -356,15 +369,16 @@@ public class Mover final BlockStoragePolicy policy = blockStoragePolicies[policyId]; if (policy == null) { LOG.warn("Failed to get the storage policy of file " + fullPath); - return false; + return; } - final List<StorageType> types = policy.chooseStorageTypes( + List<StorageType> types = policy.chooseStorageTypes( status.getReplication()); + final ErasureCodingPolicy ecPolicy = status.getErasureCodingPolicy(); final LocatedBlocks locatedBlocks = status.getBlockLocations(); - boolean hasRemaining = false; final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete(); List<LocatedBlock> lbs = locatedBlocks.getLocatedBlocks(); - for(int i = 0; i < lbs.size(); i++) { + for (int i = 0; i < lbs.size(); i++) { if (i == lbs.size() - 1 && !lastBlkComplete) { // last block is incomplete, skip it continue; @@@ -390,22 -373,22 +390,25 @@@ final StorageTypeDiff diff = new StorageTypeDiff(types, lb.getStorageTypes()); if (!diff.removeOverlap(true)) { - if (scheduleMoves4Block(diff, lb)) { + if (scheduleMoves4Block(diff, lb, ecPolicy)) { - hasRemaining |= (diff.existing.size() > 1 && - diff.expected.size() > 1); + result.updateHasRemaining(diff.existing.size() > 1 + && diff.expected.size() > 1); + // One block scheduled successfully, set noBlockMoved to false + result.setNoBlockMoved(false); + } else { + result.updateHasRemaining(true); } } } - return hasRemaining; } - boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) { + boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb, + ErasureCodingPolicy ecPolicy) { final List<MLocation> locations = MLocation.toLocations(lb); - Collections.shuffle(locations); - final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations); + if (!(lb instanceof LocatedStripedBlock)) { + Collections.shuffle(locations); + } + final DBlock db = newDBlock(lb, locations, ecPolicy); for (final StorageType t : diff.existing) { for (final MLocation ml : locations) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java index 0000000,3d79d09..6abca16 mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java @@@ -1,0 -1,261 +1,268 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs.server.namenode; + + import java.io.FileNotFoundException; + import java.io.IOException; + import java.util.List; + + import org.apache.hadoop.fs.FileAlreadyExistsException; + import org.apache.hadoop.fs.StorageType; + import org.apache.hadoop.fs.permission.FsAction; + import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; + import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; + import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; + import org.apache.hadoop.hdfs.protocol.LocatedBlock; + import org.apache.hadoop.hdfs.protocol.QuotaExceededException; + import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; + import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; + import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp; + import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature; + + import com.google.common.base.Preconditions; + + /** + * Helper class to perform append operation. + */ + final class FSDirAppendOp { + + /** + * Private constructor for preventing FSDirAppendOp object creation. + * Static-only class. + */ + private FSDirAppendOp() {} + + /** + * Append to an existing file. + * <p> + * + * The method returns the last block of the file if this is a partial block, + * which can still be used for writing more data. The client uses the + * returned block locations to form the data pipeline for this block.<br> + * The {@link LocatedBlock} will be null if the last block is full. + * The client then allocates a new block with the next call using + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#addBlock}. + * <p> + * + * For description of parameters and exceptions thrown see + * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol#append} + * + * @param fsn namespace + * @param srcArg path name + * @param pc permission checker to check fs permission + * @param holder client name + * @param clientMachine client machine info + * @param newBlock if the data is appended to a new block + * @param logRetryCache whether to record RPC ids in editlog for retry cache + * rebuilding + * @return the last block with status + */ + static LastBlockWithStatus appendFile(final FSNamesystem fsn, + final String srcArg, final FSPermissionChecker pc, final String holder, + final String clientMachine, final boolean newBlock, + final boolean logRetryCache) throws IOException { + assert fsn.hasWriteLock(); + + final byte[][] pathComponents = FSDirectory + .getPathComponentsForReservedPath(srcArg); + final LocatedBlock lb; + final FSDirectory fsd = fsn.getFSDirectory(); + final String src; + fsd.writeLock(); + try { + src = fsd.resolvePath(pc, srcArg, pathComponents); + final INodesInPath iip = fsd.getINodesInPath4Write(src); + // Verify that the destination does not exist as a directory already + final INode inode = iip.getLastINode(); + final String path = iip.getPath(); + if (inode != null && inode.isDirectory()) { + throw new FileAlreadyExistsException("Cannot append to directory " + + path + "; already exists as a directory."); + } + if (fsd.isPermissionEnabled()) { + fsd.checkPathAccess(pc, iip, FsAction.WRITE); + } + + if (inode == null) { + throw new FileNotFoundException( + "Failed to append to non-existent file " + path + " for client " + + clientMachine); + } + final INodeFile file = INodeFile.valueOf(inode, path, true); ++ ++ // not support appending file with striped blocks ++ if (file.isStriped()) { ++ throw new UnsupportedOperationException( ++ "Cannot truncate file with striped block " + src); ++ } ++ + BlockManager blockManager = fsd.getBlockManager(); + final BlockStoragePolicy lpPolicy = blockManager + .getStoragePolicy("LAZY_PERSIST"); + if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) { + throw new UnsupportedOperationException( + "Cannot append to lazy persist file " + path); + } + // Opening an existing file for append - may need to recover lease. + fsn.recoverLeaseInternal(RecoverLeaseOp.APPEND_FILE, iip, path, holder, + clientMachine, false); + + final BlockInfo lastBlock = file.getLastBlock(); + // Check that the block has at least minimum replication. + if (lastBlock != null && lastBlock.isComplete() + && !blockManager.isSufficientlyReplicated(lastBlock)) { + throw new IOException("append: lastBlock=" + lastBlock + " of src=" + + path + " is not sufficiently replicated yet."); + } + lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock, + true, logRetryCache); + } catch (IOException ie) { + NameNode.stateChangeLog + .warn("DIR* NameSystem.append: " + ie.getMessage()); + throw ie; + } finally { + fsd.writeUnlock(); + } + + HdfsFileStatus stat = FSDirStatAndListingOp.getFileInfo(fsd, src, false, + FSDirectory.isReservedRawName(srcArg)); + if (lb != null) { + NameNode.stateChangeLog.debug( + "DIR* NameSystem.appendFile: file {} for {} at {} block {} block" + + " size {}", srcArg, holder, clientMachine, lb.getBlock(), lb + .getBlock().getNumBytes()); + } + return new LastBlockWithStatus(lb, stat); + } + + /** + * Convert current node to under construction. + * Recreate in-memory lease record. + * + * @param fsn namespace + * @param iip inodes in the path containing the file + * @param leaseHolder identifier of the lease holder on this file + * @param clientMachine identifier of the client machine + * @param newBlock if the data is appended to a new block + * @param writeToEditLog whether to persist this change to the edit log + * @param logRetryCache whether to record RPC ids in editlog for retry cache + * rebuilding + * @return the last block locations if the block is partial or null otherwise + * @throws IOException + */ + static LocatedBlock prepareFileForAppend(final FSNamesystem fsn, + final INodesInPath iip, final String leaseHolder, + final String clientMachine, final boolean newBlock, + final boolean writeToEditLog, final boolean logRetryCache) + throws IOException { + assert fsn.hasWriteLock(); + + final INodeFile file = iip.getLastINode().asFile(); + final QuotaCounts delta = verifyQuotaForUCBlock(fsn, file, iip); + + file.recordModification(iip.getLatestSnapshotId()); + file.toUnderConstruction(leaseHolder, clientMachine); + + fsn.getLeaseManager().addLease( + file.getFileUnderConstructionFeature().getClientName(), file.getId()); + + LocatedBlock ret = null; + if (!newBlock) { + FSDirectory fsd = fsn.getFSDirectory(); + ret = fsd.getBlockManager().convertLastBlockToUnderConstruction(file, 0); + if (ret != null && delta != null) { + Preconditions.checkState(delta.getStorageSpace() >= 0, "appending to" + + " a block with size larger than the preferred block size"); + fsd.writeLock(); + try { + fsd.updateCountNoQuotaCheck(iip, iip.length() - 1, delta); + } finally { + fsd.writeUnlock(); + } + } + } else { + BlockInfo lastBlock = file.getLastBlock(); + if (lastBlock != null) { + ExtendedBlock blk = new ExtendedBlock(fsn.getBlockPoolId(), lastBlock); + ret = new LocatedBlock(blk, new DatanodeInfo[0]); + } + } + + if (writeToEditLog) { + final String path = iip.getPath(); + if (NameNodeLayoutVersion.supports(Feature.APPEND_NEW_BLOCK, + fsn.getEffectiveLayoutVersion())) { + fsn.getEditLog().logAppendFile(path, file, newBlock, logRetryCache); + } else { + fsn.getEditLog().logOpenFile(path, file, false, logRetryCache); + } + } + return ret; + } + + /** + * Verify quota when using the preferred block size for UC block. This is + * usually used by append and truncate. + * + * @throws QuotaExceededException when violating the storage quota + * @return expected quota usage update. null means no change or no need to + * update quota usage later + */ + private static QuotaCounts verifyQuotaForUCBlock(FSNamesystem fsn, + INodeFile file, INodesInPath iip) throws QuotaExceededException { + FSDirectory fsd = fsn.getFSDirectory(); + if (!fsn.isImageLoaded() || fsd.shouldSkipQuotaChecks()) { + // Do not check quota if editlog is still being processed + return null; + } + if (file.getLastBlock() != null) { + final QuotaCounts delta = computeQuotaDeltaForUCBlock(fsn, file); + fsd.readLock(); + try { + FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null); + return delta; + } finally { + fsd.readUnlock(); + } + } + return null; + } + + /** Compute quota change for converting a complete block to a UC block. */ + private static QuotaCounts computeQuotaDeltaForUCBlock(FSNamesystem fsn, + INodeFile file) { + final QuotaCounts delta = new QuotaCounts.Builder().build(); + final BlockInfo lastBlock = file.getLastBlock(); + if (lastBlock != null) { + final long diff = file.getPreferredBlockSize() - lastBlock.getNumBytes(); + final short repl = file.getPreferredBlockReplication(); + delta.addStorageSpace(diff * repl); + final BlockStoragePolicy policy = fsn.getFSDirectory() + .getBlockStoragePolicySuite().getPolicy(file.getStoragePolicyID()); + List<StorageType> types = policy.chooseStorageTypes(repl); + for (StorageType t : types) { + if (t.supportTypeQuota()) { + delta.addTypeSpace(t, diff); + } + } + } + return delta; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java index bad7c42,4a45074..6ec97c9 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirStatAndListingOp.java @@@ -28,9 -27,8 +28,10 @@@ import org.apache.hadoop.fs.InvalidPath import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSUtil; + import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.FsPermissionExtension; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@@ -140,9 -139,69 +142,71 @@@ class FSDirStatAndListingOp return getContentSummaryInt(fsd, iip); } + /** + * Get block locations within the specified range. + * @see ClientProtocol#getBlockLocations(String, long, long) + * @throws IOException + */ + static GetBlockLocationsResult getBlockLocations( + FSDirectory fsd, FSPermissionChecker pc, String src, long offset, + long length, boolean needBlockToken) throws IOException { + Preconditions.checkArgument(offset >= 0, + "Negative offset is not supported. File: " + src); + Preconditions.checkArgument(length >= 0, + "Negative length is not supported. File: " + src); + CacheManager cm = fsd.getFSNamesystem().getCacheManager(); + BlockManager bm = fsd.getBlockManager(); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + boolean isReservedName = FSDirectory.isReservedRawName(src); + fsd.readLock(); + try { + src = fsd.resolvePath(pc, src, pathComponents); + final INodesInPath iip = fsd.getINodesInPath(src, true); + final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src); + if (fsd.isPermissionEnabled()) { + fsd.checkPathAccess(pc, iip, FsAction.READ); + fsd.checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId()); + } + + final long fileSize = iip.isSnapshot() + ? inode.computeFileSize(iip.getPathSnapshotId()) + : inode.computeFileSizeNotIncludingLastUcBlock(); + + boolean isUc = inode.isUnderConstruction(); + if (iip.isSnapshot()) { + // if src indicates a snapshot file, we need to make sure the returned + // blocks do not exceed the size of the snapshot file. + length = Math.min(length, fileSize - offset); + isUc = false; + } + + final FileEncryptionInfo feInfo = isReservedName ? null + : fsd.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip); ++ final ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone( ++ fsd.getFSNamesystem(), iip); + + final LocatedBlocks blocks = bm.createLocatedBlocks( + inode.getBlocks(iip.getPathSnapshotId()), fileSize, isUc, offset, - length, needBlockToken, iip.isSnapshot(), feInfo); ++ length, needBlockToken, iip.isSnapshot(), feInfo, ecZone); + + // Set caching information for the located blocks. + for (LocatedBlock lb : blocks.getLocatedBlocks()) { + cm.setCachedLocations(lb); + } + + final long now = now(); + boolean updateAccessTime = fsd.isAccessTimeSupported() + && !iip.isSnapshot() + && now > inode.getAccessTime() + fsd.getAccessTimePrecision(); + return new GetBlockLocationsResult(updateAccessTime, blocks); + } finally { + fsd.readUnlock(); + } + } + private static byte getStoragePolicyID(byte inodePolicy, byte parentPolicy) { - return inodePolicy != HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED ? inodePolicy : - parentPolicy; + return inodePolicy != HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED + ? inodePolicy : parentPolicy; } /** @@@ -457,9 -505,9 +518,9 @@@ final long fileSize = !inSnapshot && isUc ? fileNode.computeFileSizeNotIncludingLastUcBlock() : size; - loc = fsd.getFSNamesystem().getBlockManager().createLocatedBlocks( + loc = fsd.getBlockManager().createLocatedBlocks( fileNode.getBlocks(snapshot), fileSize, isUc, 0L, size, false, - inSnapshot, feInfo); + inSnapshot, feInfo, ecZone); if (loc == null) { loc = new LocatedBlocks(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java index 0000000,474c257..c2c4155 mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java @@@ -1,0 -1,360 +1,371 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs.server.namenode; + + import java.io.IOException; + + import org.apache.hadoop.HadoopIllegalArgumentException; + import org.apache.hadoop.fs.UnresolvedLinkException; + import org.apache.hadoop.fs.permission.FsAction; + import org.apache.hadoop.hdfs.protocol.Block; + import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; + import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; + import org.apache.hadoop.hdfs.protocol.QuotaExceededException; + import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; + import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; ++import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; + import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; + import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; + import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; + import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp; + import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; + + import com.google.common.annotations.VisibleForTesting; + + /** + * Helper class to perform truncate operation. + */ + final class FSDirTruncateOp { + + /** + * Private constructor for preventing FSDirTruncateOp object creation. + * Static-only class. + */ + private FSDirTruncateOp() {} + + /** + * Truncate a file to a given size. + * + * @param fsn namespace + * @param srcArg path name + * @param newLength the target file size + * @param clientName client name + * @param clientMachine client machine info + * @param mtime modified time + * @param toRemoveBlocks to be removed blocks + * @param pc permission checker to check fs permission + * @return tuncate result + * @throws IOException + */ + static TruncateResult truncate(final FSNamesystem fsn, final String srcArg, + final long newLength, final String clientName, + final String clientMachine, final long mtime, + final BlocksMapUpdateInfo toRemoveBlocks, final FSPermissionChecker pc) + throws IOException, UnresolvedLinkException { + assert fsn.hasWriteLock(); + + FSDirectory fsd = fsn.getFSDirectory(); + byte[][] pathComponents = FSDirectory + .getPathComponentsForReservedPath(srcArg); + final String src; + final INodesInPath iip; + final boolean onBlockBoundary; + Block truncateBlock = null; + fsd.writeLock(); + try { + src = fsd.resolvePath(pc, srcArg, pathComponents); + iip = fsd.getINodesInPath4Write(src, true); + if (fsd.isPermissionEnabled()) { + fsd.checkPathAccess(pc, iip, FsAction.WRITE); + } + INodeFile file = INodeFile.valueOf(iip.getLastINode(), src); + final BlockStoragePolicy lpPolicy = fsd.getBlockManager() + .getStoragePolicy("LAZY_PERSIST"); + ++ // not support truncating file with striped blocks ++ if (file.isStriped()) { ++ throw new UnsupportedOperationException( ++ "Cannot truncate file with striped block " + src); ++ } ++ + if (lpPolicy != null && lpPolicy.getId() == file.getStoragePolicyID()) { + throw new UnsupportedOperationException( + "Cannot truncate lazy persist file " + src); + } + + // Check if the file is already being truncated with the same length + final BlockInfo last = file.getLastBlock(); + if (last != null && last.getBlockUCState() + == BlockUCState.UNDER_RECOVERY) { + final Block truncatedBlock = ((BlockInfoContiguousUnderConstruction) last) + .getTruncateBlock(); + if (truncatedBlock != null) { + final long truncateLength = file.computeFileSize(false, false) + + truncatedBlock.getNumBytes(); + if (newLength == truncateLength) { + return new TruncateResult(false, fsd.getAuditFileInfo(iip)); + } + } + } + + // Opening an existing file for truncate. May need lease recovery. + fsn.recoverLeaseInternal(RecoverLeaseOp.TRUNCATE_FILE, iip, src, + clientName, clientMachine, false); + // Truncate length check. + long oldLength = file.computeFileSize(); + if (oldLength == newLength) { + return new TruncateResult(true, fsd.getAuditFileInfo(iip)); + } + if (oldLength < newLength) { + throw new HadoopIllegalArgumentException( + "Cannot truncate to a larger file size. Current size: " + oldLength + + ", truncate size: " + newLength + "."); + } + // Perform INodeFile truncation. + final QuotaCounts delta = new QuotaCounts.Builder().build(); + onBlockBoundary = unprotectedTruncate(fsn, iip, newLength, + toRemoveBlocks, mtime, delta); + if (!onBlockBoundary) { + // Open file for write, but don't log into edits + long lastBlockDelta = file.computeFileSize() - newLength; + assert lastBlockDelta > 0 : "delta is 0 only if on block bounday"; + truncateBlock = prepareFileForTruncate(fsn, iip, clientName, + clientMachine, lastBlockDelta, null); + } + + // update the quota: use the preferred block size for UC block + fsd.updateCountNoQuotaCheck(iip, iip.length() - 1, delta); + } finally { + fsd.writeUnlock(); + } + + fsn.getEditLog().logTruncate(src, clientName, clientMachine, newLength, + mtime, truncateBlock); + return new TruncateResult(onBlockBoundary, fsd.getAuditFileInfo(iip)); + } + + /** + * Unprotected truncate implementation. Unlike + * {@link FSDirTruncateOp#truncate}, this will not schedule block recovery. + * + * @param fsn namespace + * @param src path name + * @param clientName client name + * @param clientMachine client machine info + * @param newLength the target file size + * @param mtime modified time + * @param truncateBlock truncate block + * @throws IOException + */ + static void unprotectedTruncate(final FSNamesystem fsn, final String src, + final String clientName, final String clientMachine, + final long newLength, final long mtime, final Block truncateBlock) + throws UnresolvedLinkException, QuotaExceededException, + SnapshotAccessControlException, IOException { + assert fsn.hasWriteLock(); + + FSDirectory fsd = fsn.getFSDirectory(); + INodesInPath iip = fsd.getINodesInPath(src, true); + INodeFile file = iip.getLastINode().asFile(); + BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo(); + boolean onBlockBoundary = unprotectedTruncate(fsn, iip, newLength, + collectedBlocks, mtime, null); + + if (!onBlockBoundary) { + BlockInfo oldBlock = file.getLastBlock(); + Block tBlk = prepareFileForTruncate(fsn, iip, clientName, clientMachine, + file.computeFileSize() - newLength, truncateBlock); + assert Block.matchingIdAndGenStamp(tBlk, truncateBlock) && + tBlk.getNumBytes() == truncateBlock.getNumBytes() : + "Should be the same block."; + if (oldBlock.getBlockId() != tBlk.getBlockId() + && !file.isBlockInLatestSnapshot(oldBlock)) { + fsd.getBlockManager().removeBlockFromMap(oldBlock); + } + } + assert onBlockBoundary == (truncateBlock == null) : + "truncateBlock is null iff on block boundary: " + truncateBlock; + fsn.removeBlocksAndUpdateSafemodeTotal(collectedBlocks); + } + + /** + * Convert current INode to UnderConstruction. Recreate lease. Create new + * block for the truncated copy. Schedule truncation of the replicas. + * + * @param fsn namespace + * @param iip inodes in the path containing the file + * @param leaseHolder lease holder + * @param clientMachine client machine info + * @param lastBlockDelta last block delta size + * @param newBlock new block + * @return the returned block will be written to editLog and passed back + * into this method upon loading. + * @throws IOException + */ + @VisibleForTesting + static Block prepareFileForTruncate(FSNamesystem fsn, INodesInPath iip, + String leaseHolder, String clientMachine, long lastBlockDelta, + Block newBlock) throws IOException { + assert fsn.hasWriteLock(); + + INodeFile file = iip.getLastINode().asFile(); + file.recordModification(iip.getLatestSnapshotId()); + file.toUnderConstruction(leaseHolder, clientMachine); + assert file.isUnderConstruction() : "inode should be under construction."; + fsn.getLeaseManager().addLease( + file.getFileUnderConstructionFeature().getClientName(), file.getId()); + boolean shouldRecoverNow = (newBlock == null); + BlockInfo oldBlock = file.getLastBlock(); ++ assert !oldBlock.isStriped(); ++ + boolean shouldCopyOnTruncate = shouldCopyOnTruncate(fsn, file, oldBlock); + if (newBlock == null) { - newBlock = (shouldCopyOnTruncate) ? fsn.createNewBlock() : new Block( ++ newBlock = (shouldCopyOnTruncate) ? ++ fsn.createNewBlock(file.isStriped()) : new Block( + oldBlock.getBlockId(), oldBlock.getNumBytes(), + fsn.nextGenerationStamp(fsn.getBlockIdManager().isLegacyBlock( + oldBlock))); + } + + BlockInfoContiguousUnderConstruction truncatedBlockUC; + BlockManager blockManager = fsn.getFSDirectory().getBlockManager(); + if (shouldCopyOnTruncate) { + // Add new truncateBlock into blocksMap and + // use oldBlock as a source for copy-on-truncate recovery + truncatedBlockUC = new BlockInfoContiguousUnderConstruction(newBlock, + file.getPreferredBlockReplication()); + truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta); + truncatedBlockUC.setTruncateBlock(oldBlock); - file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock)); ++ file.convertLastBlockToUC(truncatedBlockUC, ++ blockManager.getStorages(oldBlock)); + blockManager.addBlockCollection(truncatedBlockUC, file); + + NameNode.stateChangeLog.debug( + "BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new" + + " size {} new block {} old block {}", + truncatedBlockUC.getNumBytes(), newBlock, + truncatedBlockUC.getTruncateBlock()); + } else { + // Use new generation stamp for in-place truncate recovery + blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta); + oldBlock = file.getLastBlock(); + assert !oldBlock.isComplete() : "oldBlock should be under construction"; + truncatedBlockUC = (BlockInfoContiguousUnderConstruction) oldBlock; + truncatedBlockUC.setTruncateBlock(new Block(oldBlock)); + truncatedBlockUC.getTruncateBlock().setNumBytes( + oldBlock.getNumBytes() - lastBlockDelta); + truncatedBlockUC.getTruncateBlock().setGenerationStamp( + newBlock.getGenerationStamp()); + + NameNode.stateChangeLog.debug( + "BLOCK* prepareFileForTruncate: {} Scheduling in-place block " + + "truncate to new size {}", truncatedBlockUC.getTruncateBlock() + .getNumBytes(), truncatedBlockUC); + } + if (shouldRecoverNow) { + truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp()); + } + + return newBlock; + } + + /** + * Truncate has the following properties: + * 1.) Any block deletions occur now. + * 2.) INode length is truncated now - new clients can only read up to + * the truncated length. + * 3.) INode will be set to UC and lastBlock set to UNDER_RECOVERY. + * 4.) NN will trigger DN truncation recovery and waits for DNs to report. + * 5.) File is considered UNDER_RECOVERY until truncation recovery + * completes. + * 6.) Soft and hard Lease expiration require truncation recovery to + * complete. + * + * @return true if on the block boundary or false if recovery is need + */ + private static boolean unprotectedTruncate(FSNamesystem fsn, + INodesInPath iip, long newLength, BlocksMapUpdateInfo collectedBlocks, + long mtime, QuotaCounts delta) throws IOException { + assert fsn.hasWriteLock(); + + INodeFile file = iip.getLastINode().asFile(); + int latestSnapshot = iip.getLatestSnapshotId(); + file.recordModification(latestSnapshot, true); + + verifyQuotaForTruncate(fsn, iip, file, newLength, delta); + + long remainingLength = + file.collectBlocksBeyondMax(newLength, collectedBlocks); + file.excludeSnapshotBlocks(latestSnapshot, collectedBlocks); + file.setModificationTime(mtime); + // return whether on a block boundary + return (remainingLength - newLength) == 0; + } + + private static void verifyQuotaForTruncate(FSNamesystem fsn, + INodesInPath iip, INodeFile file, long newLength, QuotaCounts delta) + throws QuotaExceededException { + FSDirectory fsd = fsn.getFSDirectory(); + if (!fsn.isImageLoaded() || fsd.shouldSkipQuotaChecks()) { + // Do not check quota if edit log is still being processed + return; + } + final BlockStoragePolicy policy = fsd.getBlockStoragePolicySuite() + .getPolicy(file.getStoragePolicyID()); + file.computeQuotaDeltaForTruncate(newLength, policy, delta); + fsd.readLock(); + try { + FSDirectory.verifyQuota(iip, iip.length() - 1, delta, null); + } finally { + fsd.readUnlock(); + } + } + + /** + * Defines if a replica needs to be copied on truncate or + * can be truncated in place. + */ + private static boolean shouldCopyOnTruncate(FSNamesystem fsn, INodeFile file, + BlockInfo blk) { + if (!fsn.isUpgradeFinalized()) { + return true; + } + if (fsn.isRollingUpgrade()) { + return true; + } + return file.isBlockInLatestSnapshot(blk); + } + + /** + * Result of truncate operation. + */ + static class TruncateResult { + private final boolean result; + private final HdfsFileStatus stat; + + public TruncateResult(boolean result, HdfsFileStatus stat) { + this.result = result; + this.stat = stat; + } + + /** + * @return true if client does not need to wait for block recovery, + * false if client needs to wait for block recovery. + */ + boolean getResult() { + return result; + } + + /** + * @return file information. + */ + HdfsFileStatus getFileStatus() { + return stat; + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 8f4f51a,3d30a19..086aa05 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@@ -45,9 -43,7 +45,9 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; - import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous; - import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionStriped; + import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; ++import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@@ -200,21 -195,18 +200,21 @@@ class FSDirWriteFileOp blockSize = pendingFile.getPreferredBlockSize(); clientMachine = pendingFile.getFileUnderConstructionFeature() .getClientMachine(); - replication = pendingFile.getFileReplication(); + isStriped = pendingFile.isStriped(); + numTargets = isStriped ? + HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS : + pendingFile.getFileReplication(); storagePolicyID = pendingFile.getStoragePolicyID(); - return new ValidateAddBlockResult(blockSize, replication, storagePolicyID, - clientMachine); + return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID, + clientMachine, isStriped); } - static LocatedBlock makeLocatedBlock(FSNamesystem fsn, Block blk, + static LocatedBlock makeLocatedBlock(FSNamesystem fsn, BlockInfo blk, DatanodeStorageInfo[] locs, long offset) throws IOException { LocatedBlock lBlk = BlockManager.newLocatedBlock(fsn.getExtendedBlock(blk), - locs, offset, false); - fsn.getFSDirectory().getBlockManager() - .setBlockToken(lBlk, BlockTokenIdentifier.AccessMode.WRITE); + blk, locs, offset); - fsn.getBlockManager().setBlockToken(lBlk, - BlockTokenIdentifier.AccessMode.WRITE); ++ fsn.getFSDirectory().getBlockManager(). ++ setBlockToken(lBlk, BlockTokenIdentifier.AccessMode.WRITE); return lBlk; } @@@ -527,31 -515,17 +526,31 @@@ final INodeFile fileINode = inodesInPath.getLastINode().asFile(); Preconditions.checkState(fileINode.isUnderConstruction()); - // check quota limits and updated space consumed - fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), - fileINode.getPreferredBlockReplication(), true); - // associate new last block for the file - BlockInfoContiguousUnderConstruction blockInfo = - new BlockInfoContiguousUnderConstruction( - block, - fileINode.getFileReplication(), - HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, + final BlockInfo blockInfo; + if (isStriped) { + ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone( + fsd.getFSNamesystem(), inodesInPath); + ErasureCodingPolicy ecPolicy = ecZone.getErasureCodingPolicy(); + short numDataUnits = (short) ecPolicy.getNumDataUnits(); + short numParityUnits = (short) ecPolicy.getNumParityUnits(); + short numLocations = (short) (numDataUnits + numParityUnits); + + // check quota limits and updated space consumed + fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), + numLocations, true); - blockInfo = new BlockInfoUnderConstructionStriped(block, ecPolicy, ++ blockInfo = new BlockInfoStripedUnderConstruction(block, ecPolicy, + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); + } else { + // check quota limits and updated space consumed + fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), + fileINode.getPreferredBlockReplication(), true); + + short numLocations = fileINode.getFileReplication(); - blockInfo = new BlockInfoUnderConstructionContiguous(block, ++ blockInfo = new BlockInfoContiguousUnderConstruction(block, + numLocations, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); + } fsd.getBlockManager().addBlockCollection(blockInfo, fileINode); fileINode.addBlock(blockInfo); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index b5b7178,3dd076d..008a327 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@@ -36,20 -36,16 +36,20 @@@ import org.apache.hadoop.classification import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; - import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionStriped; ++import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LayoutVersion; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; - import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous; + import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; @@@ -988,17 -969,10 +988,17 @@@ public class FSEditLogLoader Preconditions.checkState(oldBlocks == null || oldBlocks.length == 0); } // add the new block - BlockInfo newBI = new BlockInfoContiguousUnderConstruction( - newBlock, file.getPreferredBlockReplication()); - fsNamesys.getBlockManager().addBlockCollection(newBI, file); - file.addBlock(newBI); + final BlockInfo newBlockInfo; + boolean isStriped = ecZone != null; + if (isStriped) { - newBlockInfo = new BlockInfoUnderConstructionStriped(newBlock, ++ newBlockInfo = new BlockInfoStripedUnderConstruction(newBlock, + ecZone.getErasureCodingPolicy()); + } else { - newBlockInfo = new BlockInfoUnderConstructionContiguous(newBlock, ++ newBlockInfo = new BlockInfoContiguousUnderConstruction(newBlock, + file.getPreferredBlockReplication()); + } + fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBlockInfo, file); + file.addBlock(newBlockInfo); fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock); } @@@ -1076,13 -1049,8 +1076,13 @@@ // TODO: shouldn't this only be true for the last block? // what about an old-version fsync() where fsync isn't called // until several blocks in? - newBI = new BlockInfoContiguousUnderConstruction( - newBlock, file.getPreferredBlockReplication()); + if (isStriped) { - newBI = new BlockInfoUnderConstructionStriped(newBlock, ++ newBI = new BlockInfoStripedUnderConstruction(newBlock, + ecZone.getErasureCodingPolicy()); + } else { - newBI = new BlockInfoUnderConstructionContiguous(newBlock, ++ newBI = new BlockInfoContiguousUnderConstruction(newBlock, + file.getPreferredBlockReplication()); + } } else { // OP_CLOSE should add finalized blocks. This code path // is only executed when loading edits written by prior http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index 2e490e7,30517d0..e7c87d6 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@@ -778,9 -776,9 +778,9 @@@ public class FSImageFormat clientMachine = FSImageSerialization.readString(in); // convert the last block to BlockUC if (blocks.length > 0) { - BlockInfo lastBlk = blocks[blocks.length - 1]; - blocks[blocks.length - 1] = new BlockInfoContiguousUnderConstruction( - lastBlk, replication); + Block lastBlk = blocks[blocks.length - 1]; + blocks[blocks.length - 1] = - new BlockInfoUnderConstructionContiguous(lastBlk, replication); ++ new BlockInfoContiguousUnderConstruction(lastBlk, replication); } } } @@@ -1144,7 -1141,7 +1144,7 @@@ + " option to automatically rename these paths during upgrade."; /** -- * Same as {@link #renameReservedPathsOnUpgrade(String)}, but for a single ++ * Same as {@link #renameReservedPathsOnUpgrade}, but for a single * byte array path component. */ private static byte[] renameReservedComponentOnUpgrade(byte[] component, @@@ -1164,7 -1161,7 +1164,7 @@@ } /** -- * Same as {@link #renameReservedPathsOnUpgrade(String)}, but for a single ++ * Same as {@link #renameReservedPathsOnUpgrade}, but for a single * byte array path component. */ private static byte[] renameReservedRootComponentOnUpgrade(byte[] component, http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 653bd4a,e8378e5..51b04d0 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@@ -45,9 -44,7 +45,9 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; - import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous; + import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; - import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionStriped; ++import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext; import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext; @@@ -366,15 -364,8 +375,15 @@@ public final class FSImageFormatPBINod if (blocks.length > 0) { BlockInfo lastBlk = file.getLastBlock(); // replace the last block of file - file.setBlock(file.numBlocks() - 1, new BlockInfoContiguousUnderConstruction( - lastBlk, replication)); + final BlockInfo ucBlk; + if (isStriped) { + BlockInfoStriped striped = (BlockInfoStriped) lastBlk; - ucBlk = new BlockInfoUnderConstructionStriped(striped, ecPolicy); ++ ucBlk = new BlockInfoStripedUnderConstruction(striped, ecPolicy); + } else { - ucBlk = new BlockInfoUnderConstructionContiguous(lastBlk, ++ ucBlk = new BlockInfoContiguousUnderConstruction(lastBlk, + replication); + } + file.setBlock(file.numBlocks() - 1, ucBlk); } } return file; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java index d87378c,f71cf0b..af3f813 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java @@@ -33,8 -33,9 +33,8 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.LayoutVersion; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; - import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous; + import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap; @@@ -138,10 -137,9 +138,10 @@@ public class FSImageSerialization // last block is UNDER_CONSTRUCTION if(numBlocks > 0) { blk.readFields(in); - blocksContiguous[i] = new BlockInfoUnderConstructionContiguous( - blocks[i] = new BlockInfoContiguousUnderConstruction( - blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null); ++ blocksContiguous[i] = new BlockInfoContiguousUnderConstruction( + blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null); } + PermissionStatus perm = PermissionStatus.read(in); String clientName = readString(in); String clientMachine = readString(in); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 657f29d,d34242c..2eb9f2a --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@@ -168,10 -166,8 +165,9 @@@ import org.apache.hadoop.hdfs.DFSUtil import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException; - import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; import org.apache.hadoop.hdfs.protocol.CachePoolEntry; @@@ -207,10 -199,7 +203,10 @@@ import org.apache.hadoop.hdfs.security. import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; - import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstructionContiguous; + import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@@ -4240,8 -3714,9 +3774,9 @@@ public class FSNamesystem implements Na while (it.hasNext()) { Block b = it.next(); - BlockInfo blockInfo = blockManager.getStoredBlock(b); - if (blockInfo.getBlockCollection().getStoragePolicyID() - == lpPolicy.getId()) { + BlockInfo blockInfo = getStoredBlock(b); - if (blockInfo.getBlockCollection().getStoragePolicyID() == lpPolicy.getId()) { ++ if (blockInfo.getBlockCollection().getStoragePolicyID() == ++ lpPolicy.getId()) { filesToDelete.add(blockInfo.getBlockCollection()); } } @@@ -6670,8 -6142,8 +6220,9 @@@ public void setFSDirectory(FSDirectory dir) { this.dir = dir; } + /** @return the cache manager. */ + @Override public CacheManager getCacheManager() { return cacheManager; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 25415ef,3f242e0..13f180a --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@@ -39,11 -37,10 +39,12 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; + import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff; import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList; import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshotFeature; @@@ -445,16 -413,6 +446,15 @@@ public class INodeFile extends INodeWit setStoragePolicyID(storagePolicyId); } - + /** + * @return true if the file is in the striping layout. + */ + @VisibleForTesting + @Override + public boolean isStriped() { + return HeaderFormat.isStriped(header); + } + @Override // INodeFileAttributes public long getHeaderLong() { return header; @@@ -483,9 -439,9 +483,8 @@@ snapshotBlocks = getDiffs().findLaterSnapshotBlocks(snapshot); return (snapshotBlocks == null) ? getBlocks() : snapshotBlocks; } -- - /** Used during concat to update the BlockCollection for each block */ - void updateBlockCollection() { + /** Used during concat to update the BlockCollection for each block. */ + private void updateBlockCollection() { if (blocks != null) { for(BlockInfo b : blocks) { b.setBlockCollection(this); http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java index f93218f,3a5dc12..2cbecdc --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java @@@ -63,17 -78,16 +78,17 @@@ public class NameNodeLayoutVersion * </ul> */ public static enum Feature implements LayoutFeature { - ROLLING_UPGRADE(-55, -53, "Support rolling upgrade", false), - EDITLOG_LENGTH(-56, "Add length field to every edit log op"), - XATTRS(-57, "Extended attributes"), - CREATE_OVERWRITE(-58, "Use single editlog record for " + + ROLLING_UPGRADE(-55, -53, -55, "Support rolling upgrade", false), + EDITLOG_LENGTH(-56, -56, "Add length field to every edit log op"), + XATTRS(-57, -57, "Extended attributes"), + CREATE_OVERWRITE(-58, -58, "Use single editlog record for " + "creating file with overwrite"), - XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces"), - BLOCK_STORAGE_POLICY(-60, "Block Storage policy"), - TRUNCATE(-61, "Truncate"), - APPEND_NEW_BLOCK(-62, "Support appending to new block"), - QUOTA_BY_STORAGE_TYPE(-63, "Support quota for specific storage types"), - ERASURE_CODING(-64, "Support erasure coding"); + XATTRS_NAMESPACE_EXT(-59, -59, "Increase number of xattr namespaces"), + BLOCK_STORAGE_POLICY(-60, -60, "Block Storage policy"), + TRUNCATE(-61, -61, "Truncate"), + APPEND_NEW_BLOCK(-62, -61, "Support appending to new block"), - QUOTA_BY_STORAGE_TYPE(-63, -61, "Support quota for specific storage types"); ++ QUOTA_BY_STORAGE_TYPE(-63, -61, "Support quota for specific storage types"), ++ ERASURE_CODING(-64, -61, "Support quota for specific storage types"); private final FeatureInfo info; http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc10933a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java ----------------------------------------------------------------------