http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index 6093776,0000000..7b21cbe mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@@ -1,258 -1,0 +1,253 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + +/** + * Subclass of {@link BlockInfo}, presenting a block group in erasure coding. + * + * We still use triplets to store DatanodeStorageInfo for each block in the + * block group, as well as the previous/next block in the corresponding + * DatanodeStorageInfo. For a (m+k) block group, the first (m+k) triplet units + * are sorted and strictly mapped to the corresponding block. + * + * Normally each block belonging to group is stored in only one DataNode. + * However, it is possible that some block is over-replicated. Thus the triplet + * array's size can be larger than (m+k). Thus currently we use an extra byte + * array to record the block index for each triplet. + */ +public class BlockInfoStriped extends BlockInfo { + private final ErasureCodingPolicy ecPolicy; + /** + * Always the same size with triplets. Record the block index for each triplet + * TODO: actually this is only necessary for over-replicated block. Thus can + * be further optimized to save memory usage. + */ + private byte[] indices; + + public BlockInfoStriped(Block blk, ErasureCodingPolicy ecPolicy) { + super(blk, (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits())); + indices = new byte[ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()]; + initIndices(); + this.ecPolicy = ecPolicy; + } + - BlockInfoStriped(BlockInfoStriped b) { - this(b, b.getErasureCodingPolicy()); - this.setBlockCollection(b.getBlockCollection()); - } - + public short getTotalBlockNum() { + return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits()); + } + + public short getDataBlockNum() { + return (short) ecPolicy.getNumDataUnits(); + } + + public short getParityBlockNum() { + return (short) ecPolicy.getNumParityUnits(); + } + + /** + * If the block is committed/completed and its length is less than a full + * stripe, it returns the the number of actual data blocks. + * Otherwise it returns the number of data units specified by erasure coding policy. + */ + public short getRealDataBlockNum() { + if (isComplete() || getBlockUCState() == BlockUCState.COMMITTED) { + return (short) Math.min(getDataBlockNum(), + (getNumBytes() - 1) / BLOCK_STRIPED_CELL_SIZE + 1); + } else { + return getDataBlockNum(); + } + } + + public short getRealTotalBlockNum() { + return (short) (getRealDataBlockNum() + getParityBlockNum()); + } + + public ErasureCodingPolicy getErasureCodingPolicy() { + return ecPolicy; + } + + private void initIndices() { + for (int i = 0; i < indices.length; i++) { + indices[i] = -1; + } + } + + private int findSlot() { + int i = getTotalBlockNum(); + for (; i < getCapacity(); i++) { + if (getStorageInfo(i) == null) { + return i; + } + } + // need to expand the triplet size + ensureCapacity(i + 1, true); + return i; + } + + @Override + boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { + int blockIndex = BlockIdManager.getBlockIndex(reportedBlock); + int index = blockIndex; + DatanodeStorageInfo old = getStorageInfo(index); + if (old != null && !old.equals(storage)) { // over replicated + // check if the storage has been stored + int i = findStorageInfo(storage); + if (i == -1) { + index = findSlot(); + } else { + return true; + } + } + addStorage(storage, index, blockIndex); + return true; + } + + private void addStorage(DatanodeStorageInfo storage, int index, + int blockIndex) { + setStorageInfo(index, storage); + setNext(index, null); + setPrevious(index, null); + indices[index] = (byte) blockIndex; + } + + private int findStorageInfoFromEnd(DatanodeStorageInfo storage) { + final int len = getCapacity(); + for(int idx = len - 1; idx >= 0; idx--) { + DatanodeStorageInfo cur = getStorageInfo(idx); + if (storage.equals(cur)) { + return idx; + } + } + return -1; + } + + int getStorageBlockIndex(DatanodeStorageInfo storage) { + int i = this.findStorageInfo(storage); + return i == -1 ? -1 : indices[i]; + } + + /** + * Identify the block stored in the given datanode storage. Note that + * the returned block has the same block Id with the one seen/reported by the + * DataNode. + */ + Block getBlockOnStorage(DatanodeStorageInfo storage) { + int index = getStorageBlockIndex(storage); + if (index < 0) { + return null; + } else { + Block block = new Block(this); + block.setBlockId(this.getBlockId() + index); + return block; + } + } + + @Override + boolean removeStorage(DatanodeStorageInfo storage) { + int dnIndex = findStorageInfoFromEnd(storage); + if (dnIndex < 0) { // the node is not found + return false; + } + assert getPrevious(dnIndex) == null && getNext(dnIndex) == null : + "Block is still in the list and must be removed first."; + // set the triplet to null + setStorageInfo(dnIndex, null); + setNext(dnIndex, null); + setPrevious(dnIndex, null); + indices[dnIndex] = -1; + return true; + } + + private void ensureCapacity(int totalSize, boolean keepOld) { + if (getCapacity() < totalSize) { + Object[] old = triplets; + byte[] oldIndices = indices; + triplets = new Object[totalSize * 3]; + indices = new byte[totalSize]; + initIndices(); + + if (keepOld) { + System.arraycopy(old, 0, triplets, 0, old.length); + System.arraycopy(oldIndices, 0, indices, 0, oldIndices.length); + } + } + } + + @Override + void replaceBlock(BlockInfo newBlock) { + assert newBlock instanceof BlockInfoStriped; + BlockInfoStriped newBlockGroup = (BlockInfoStriped) newBlock; + final int size = getCapacity(); + newBlockGroup.ensureCapacity(size, false); + for (int i = 0; i < size; i++) { + final DatanodeStorageInfo storage = this.getStorageInfo(i); + if (storage != null) { + final int blockIndex = indices[i]; + final boolean removed = storage.removeBlock(this); + assert removed : "currentBlock not found."; + + newBlockGroup.addStorage(storage, i, blockIndex); + storage.insertToList(newBlockGroup); + } + } + } + + public long spaceConsumed() { + // In case striped blocks, total usage by this striped blocks should + // be the total of data blocks and parity blocks because + // `getNumBytes` is the total of actual data block size. + return StripedBlockUtil.spaceConsumedByStripedBlock(getNumBytes(), + ecPolicy.getNumDataUnits(), ecPolicy.getNumParityUnits(), + BLOCK_STRIPED_CELL_SIZE); + } + + @Override + public final boolean isStriped() { + return true; + } + + @Override + public int numNodes() { + assert this.triplets != null : "BlockInfo is not initialized"; + assert triplets.length % 3 == 0 : "Malformed BlockInfo"; + int num = 0; + for (int idx = getCapacity()-1; idx >= 0; idx--) { + if (getStorageInfo(idx) != null) { + num++; + } + } + return num; + } + + @Override + final boolean hasNoStorage() { + final int len = getCapacity(); + for(int idx = 0; idx < len; idx++) { + if (getStorageInfo(idx) != null) { + return false; + } + } + return true; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index ae08825,95933d2..6c6d758 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@@ -89,12 -84,8 +88,13 @@@ import org.apache.hadoop.hdfs.server.pr import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; + import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; + import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; @@@ -216,8 -202,8 +216,8 @@@ public class BlockManager implements Bl * Maps a StorageID to the set of blocks that are "extra" for this * DataNode. We'll eventually remove these extras. */ - public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap = + public final Map<String, LightWeightLinkedSet<BlockInfo>> excessReplicateMap = - new TreeMap<>(); + new HashMap<>(); /** * Store set of Blocks that need to be replicated 1 or more times. @@@ -689,26 -662,20 +689,25 @@@ */ private BlockInfo completeBlock(final BlockCollection bc, final int blkIndex, boolean force) throws IOException { - if(blkIndex < 0) + if (blkIndex < 0) { return null; + } BlockInfo curBlock = bc.getBlocks()[blkIndex]; - if(curBlock.isComplete()) + if (curBlock.isComplete()) { return curBlock; + } int numNodes = curBlock.numNodes(); - if (!force && numNodes < minReplication) + if (!force && !hasMinStorage(curBlock, numNodes)) { throw new IOException("Cannot complete block: " + "block does not satisfy minimal replication requirement."); - if(!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED) + } + if (!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED) { throw new IOException( "Cannot complete block: block has not been COMMITTED by the client"); - BlockInfo completeBlock = curBlock.convertToCompleteBlock(); + } + + final BlockInfo completeBlock = curBlock.convertToCompleteBlock(); - // replace penultimate block in file bc.setBlock(blkIndex, completeBlock); @@@ -1276,9 -1186,8 +1276,9 @@@ " corrupt as it does not belong to any file", b); addToInvalidates(b.corrupted, node); return; -- } - short expectedReplicas = b.corrupted.getReplication(); ++ } + short expectedReplicas = - getExpectedReplicaNum(b.stored.getBlockCollection(), b.stored); ++ getExpectedReplicaNum(b.stored); // Add replica to the data-node if it is not already there if (storageInfo != null) { @@@ -1446,10 -1350,10 +1446,10 @@@ namesystem.writeLock(); try { synchronized (neededReplications) { - for (int priority = 0; priority < blocksToReplicate.size(); priority++) { - for (BlockInfo block : blocksToReplicate.get(priority)) { + for (int priority = 0; priority < blocksToRecover.size(); priority++) { + for (BlockInfo block : blocksToRecover.get(priority)) { // block should belong to a file - bc = blocksMap.getBlockCollection(block); + bc = getBlockCollection(block); // abandoned block or block reopened for append if (bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { @@@ -1458,20 -1362,17 +1458,20 @@@ continue; } - requiredReplication = getExpectedReplicaNum(bc, block); + requiredReplication = getExpectedReplicaNum(block); // get a source data-node - containingNodes = new ArrayList<DatanodeDescriptor>(); - List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>(); + containingNodes = new ArrayList<>(); + List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>(); NumberReplicas numReplicas = new NumberReplicas(); - srcNode = chooseSourceDatanode( - block, containingNodes, liveReplicaNodes, numReplicas, - priority); - if(srcNode == null) { // block can not be replicated from any node - LOG.debug("Block " + block + " cannot be repl from any node"); + List<Short> liveBlockIndices = new ArrayList<>(); + final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block, + containingNodes, liveReplicaNodes, numReplicas, + liveBlockIndices, priority); + if(srcNodes == null || srcNodes.length == 0) { + // block can not be replicated from any node + LOG.debug("Block " + block + " cannot be recovered " + + "from any node"); continue; } @@@ -1588,32 -1474,7 +1588,32 @@@ } // Add block to the to be replicated list - rw.srcNode.addBlockToBeReplicated(block, targets); + if (block.isStriped()) { + assert rw instanceof ErasureCodingWork; + assert rw.targets.length > 0; - String src = block.getBlockCollection().getName(); ++ String src = getBlockCollection(block).getName(); + ErasureCodingZone ecZone = null; + try { + ecZone = namesystem.getErasureCodingZoneForPath(src); + } catch (IOException e) { + blockLog + .warn("Failed to get the EC zone for the file {} ", src); + } + if (ecZone == null) { + blockLog.warn("No erasure coding policy found for the file {}. " + + "So cannot proceed for recovery", src); + // TODO: we may have to revisit later for what we can do better to + // handle this case. + continue; + } + rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded( + new ExtendedBlock(namesystem.getBlockPoolId(), block), + rw.srcNodes, rw.targets, + ((ErasureCodingWork) rw).liveBlockIndicies, + ecZone.getErasureCodingPolicy()); + } else { + rw.srcNodes[0].addBlockToBeReplicated(block, targets); + } scheduledWork++; DatanodeStorageInfo.incrementBlocksScheduled(targets); @@@ -2079,8 -1924,8 +2080,8 @@@ private void removeZombieReplicas(BlockReportContext context, DatanodeStorageInfo zombie) { LOG.warn("processReport 0x{}: removing zombie storage {}, which no " + -- "longer exists on the DataNode.", -- Long.toHexString(context.getReportId()), zombie.getStorageID()); ++ "longer exists on the DataNode.", ++ Long.toHexString(context.getReportId()), zombie.getStorageID()); assert(namesystem.hasWriteLock()); Iterator<BlockInfo> iter = zombie.getBlockIterator(); int prevBlocks = zombie.numBlocks(); @@@ -2324,10 -2164,10 +2325,10 @@@ // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode // refer HDFS-5283 - if (namesystem.isInSnapshot(storedBlock.getBlockCollection())) { + if (namesystem.isInSnapshot(storedBlock)) { int numOfReplicas = storedBlock.getUnderConstructionFeature() .getNumExpectedLocations(); - namesystem.incrementSafeBlockCount(numOfReplicas); + namesystem.incrementSafeBlockCount(numOfReplicas, storedBlock); } //and fall through to next clause } @@@ -2720,8 -2541,8 +2721,8 @@@ // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED - && numCurrentReplica >= minReplication) { + && hasMinStorage(storedBlock, numCurrentReplica)) { - completeBlock(storedBlock.getBlockCollection(), storedBlock, false); + completeBlock(getBlockCollection(storedBlock), storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block // only complete blocks are counted towards that. @@@ -2760,10 -2580,11 +2761,11 @@@ // it will happen in next block report otherwise. return block; } - BlockCollection bc = storedBlock.getBlockCollection(); + BlockCollection bc = getBlockCollection(storedBlock); + assert bc != null : "Block must belong to a file"; // add block to the datanode - AddBlockResult result = storageInfo.addBlock(storedBlock); + AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock); int curReplicaDelta; if (result == AddBlockResult.ADDED) { @@@ -3817,9 -3491,9 +3809,9 @@@ */ public void checkReplication(BlockCollection bc) { for (BlockInfo block : bc.getBlocks()) { - short expected = getExpectedReplicaNum(bc, block); - final short expected = block.getReplication(); ++ short expected = getExpectedReplicaNum(block); final NumberReplicas n = countNodes(block); - if (isNeededReplication(block, expected, n.liveReplicas())) { + if (isNeededReplication(block, n.liveReplicas())) { neededReplications.add(block, n.liveReplicas(), n.decommissionedAndDecommissioning(), expected); } else if (n.liveReplicas() > expected) { @@@ -3978,18 -3605,15 +3968,17 @@@ * A block needs replication if the number of replicas is less than expected * or if it does not have enough racks. */ - boolean isNeededReplication(BlockInfo storedBlock, int expected, int current) { + boolean isNeededReplication(BlockInfo storedBlock, int current) { - int expected = storedBlock.getReplication(); - return current < expected || !blockHasEnoughRacks(storedBlock); ++ int expected = getExpectedReplicaNum(storedBlock); + return current < expected || !blockHasEnoughRacks(storedBlock, expected); } - - public short getExpectedReplicaNum(BlockCollection bc, BlockInfo block) { - if (block.isStriped()) { - return ((BlockInfoStriped) block).getRealTotalBlockNum(); - } else { - return bc.getPreferredBlockReplication(); - } + + public short getExpectedReplicaNum(BlockInfo block) { - return block.getReplication(); ++ return block.isStriped() ? ++ ((BlockInfoStriped) block).getRealTotalBlockNum() : ++ block.getReplication(); } - + public long getMissingBlocksCount() { // not locking return this.neededReplications.getCorruptBlockSize(); @@@ -4005,22 -3629,13 +3994,22 @@@ return blocksMap.addBlockCollection(block, bc); } - public BlockCollection getBlockCollection(BlockInfo b) { - return namesystem.getBlockCollection(b.getBlockCollectionId()); + /** + * Do some check when adding a block to blocksmap. + * For HDFS-7994 to check whether then block is a NonEcBlockUsingStripedID. + * + */ + public BlockInfo addBlockCollectionWithCheck( + BlockInfo block, BlockCollection bc) { + if (!hasNonEcBlockUsingStripedID && !block.isStriped() && + BlockIdManager.isStripedBlockID(block.getBlockId())) { + hasNonEcBlockUsingStripedID = true; + } + return addBlockCollection(block, bc); } - public BlockCollection getBlockCollection(Block b) { - return blocksMap.getBlockCollection(b); - /** @return an iterator of the datanodes. */ - public Iterable<DatanodeStorageInfo> getStorages(final Block block) { - return blocksMap.getStorages(block); ++ public BlockCollection getBlockCollection(BlockInfo b) { ++ return namesystem.getBlockCollection(b.getBlockCollectionId()); } public int numCorruptReplicas(Block block) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java index 58b455e,88cf06d..0e92779 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java @@@ -55,11 -58,11 +58,11 @@@ public class BlockUnderConstructionFeat private Block truncateBlock; public BlockUnderConstructionFeature(Block blk, - BlockUCState state, DatanodeStorageInfo[] targets) { + BlockUCState state, DatanodeStorageInfo[] targets, boolean isStriped) { assert getBlockUCState() != COMPLETE : - "BlockUnderConstructionFeature cannot be in COMPLETE state"; + "BlockUnderConstructionFeature cannot be in COMPLETE state"; this.blockUCState = state; - setExpectedLocations(blk, targets); + setExpectedLocations(blk, targets, isStriped); } /** Set expected locations */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java index 5bfae42,33c68f3..51d62c1 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java @@@ -129,18 -118,14 +125,18 @@@ class BlocksMap if (blockInfo == null) return; - blockInfo.setBlockCollection(null); - final int size = blockInfo instanceof BlockInfoContiguous ? - blockInfo.numNodes() : blockInfo.getCapacity(); + blockInfo.setBlockCollectionId(INodeId.INVALID_INODE_ID); - for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) { ++ final int size = blockInfo.isStriped() ? ++ blockInfo.getCapacity() : blockInfo.numNodes(); + for(int idx = size - 1; idx >= 0; idx--) { DatanodeDescriptor dn = blockInfo.getDatanode(idx); - dn.removeBlock(blockInfo); // remove from the list and wipe the location + if (dn != null) { + dn.removeBlock(blockInfo); // remove from the list and wipe the location + } } } - - /** Returns the block object it it exists in the map. */ + + /** Returns the block object if it exists in the map. */ BlockInfo getStoredBlock(Block b) { return blocks.get(b); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index a4d5442,7e3c59b..29e541c --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@@ -38,11 -39,10 +38,12 @@@ import org.apache.hadoop.fs.StorageType import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; + import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.util.EnumCounters; @@@ -223,12 -222,8 +224,11 @@@ public class DatanodeDescriptor extend /** A queue of blocks to be replicated by this datanode */ private final BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<>(); + /** A queue of blocks to be erasure coded by this datanode */ + private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks = + new BlockQueue<>(); /** A queue of blocks to be recovered by this datanode */ - private final BlockQueue<BlockInfo> recoverBlocks = - new BlockQueue<>(); + private final BlockQueue<BlockInfo> recoverBlocks = new BlockQueue<>(); /** A set of blocks to be invalidated by this datanode */ private final LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<>(); @@@ -696,24 -662,27 +696,34 @@@ } } + @VisibleForTesting + public boolean containsInvalidateBlock(Block block) { + synchronized (invalidateBlocks) { + return invalidateBlocks.contains(block); + } + } + /** - * @return Approximate number of blocks currently scheduled to be written + * Return the sum of remaining spaces of the specified type. If the remaining + * space of a storage is less than minSize, it won't be counted toward the + * sum. + * + * @param t The storage type. If null, the type is ignored. + * @param minSize The minimum free space required. + * @return the sum of remaining spaces that are bigger than minSize. */ - public long getRemaining(StorageType t) { + public long getRemaining(StorageType t, long minSize) { long remaining = 0; - for(DatanodeStorageInfo s : getStorageInfos()) { - if (s.getStorageType() == t) { - remaining += s.getRemaining(); + for (DatanodeStorageInfo s : getStorageInfos()) { + if (s.getState() == State.NORMAL && + (t == null || s.getStorageType() == t)) { + long r = s.getRemaining(); + if (r >= minSize) { + remaining += r; + } } } - return remaining; + return remaining; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index 5e3cac2,1a20ab0..a80bfd6 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@@ -233,16 -234,16 +234,16 @@@ public class DecommissionManager } /** - * Checks whether a block is sufficiently replicated for decommissioning. - * Full-strength replication is not always necessary, hence "sufficient". + * Checks whether a block is sufficiently replicated/stored for + * decommissioning. For replicated blocks or striped blocks, full-strength + * replication or storage is not always necessary, hence "sufficient". * @return true if sufficient, else false. */ - private boolean isSufficientlyReplicated(BlockInfo block, - BlockCollection bc, + private boolean isSufficient(BlockInfo block, BlockCollection bc, NumberReplicas numberReplicas) { - final int numExpected = blockManager.getExpectedReplicaNum(bc, block); - final int numExpected = block.getReplication(); ++ final int numExpected = blockManager.getExpectedReplicaNum(block); final int numLive = numberReplicas.liveReplicas(); - if (!blockManager.isNeededReplication(block, numExpected, numLive)) { + if (!blockManager.isNeededReplication(block, numLive)) { // Block doesn't need replication. Skip. LOG.trace("Block {} does not need replication.", block); return true; @@@ -274,11 -274,12 +275,12 @@@ return false; } - private void logBlockReplicationInfo(BlockInfo block, BlockCollection bc, - private static void logBlockReplicationInfo(BlockInfo block, ++ private void logBlockReplicationInfo(BlockInfo block, + BlockCollection bc, DatanodeDescriptor srcNode, NumberReplicas num, Iterable<DatanodeStorageInfo> storages) { int curReplicas = num.liveReplicas(); - int curExpectedReplicas = blockManager.getExpectedReplicaNum(bc, block); - int curExpectedReplicas = block.getReplication(); ++ int curExpectedReplicas = blockManager.getExpectedReplicaNum(block); StringBuilder nodeList = new StringBuilder(); for (DatanodeStorageInfo storage : storages) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); @@@ -530,8 -536,10 +533,9 @@@ continue; } + BlockCollection bc = namesystem.getBlockCollection(bcId); final NumberReplicas num = blockManager.countNodes(block); final int liveReplicas = num.liveReplicas(); - final int curReplicas = liveReplicas; // Schedule under-replicated blocks for replication if not already // pending @@@ -542,9 -549,9 +545,9 @@@ namesystem.isPopulatingReplQueues()) { // Process these blocks only when active NN is out of safe mode. blockManager.neededReplications.add(block, - curReplicas, + liveReplicas, num.decommissionedAndDecommissioning(), - blockManager.getExpectedReplicaNum(bc, block)); - block.getReplication()); ++ blockManager.getExpectedReplicaNum(block)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java index 479ee4c,0000000..7a52273 mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockGroupIdGenerator.java @@@ -1,85 -1,0 +1,86 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.util.SequentialNumber; + +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BLOCK_GROUP_INDEX_MASK; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_BLOCKS_IN_GROUP; + +/** + * Generate the next valid block group ID by incrementing the maximum block + * group ID allocated so far, with the first 2^10 block group IDs reserved. + * HDFS-EC introduces a hierarchical protocol to name blocks and groups: + * Contiguous: {reserved block IDs | flag | block ID} + * Striped: {reserved block IDs | flag | block group ID | index in group} + * + * Following n bits of reserved block IDs, The (n+1)th bit in an ID + * distinguishes contiguous (0) and striped (1) blocks. For a striped block, + * bits (n+2) to (64-m) represent the ID of its block group, while the last m + * bits represent its index of the group. The value m is determined by the + * maximum number of blocks in a group (MAX_BLOCKS_IN_GROUP). + * + * Note that the {@link #nextValue()} methods requires external lock to + * guarantee IDs have no conflicts. + */ +@InterfaceAudience.Private +public class SequentialBlockGroupIdGenerator extends SequentialNumber { + + private final BlockManager blockManager; + + SequentialBlockGroupIdGenerator(BlockManager blockManagerRef) { + super(Long.MIN_VALUE); + this.blockManager = blockManagerRef; + } + + @Override // NumberGenerator + public long nextValue() { + skipTo((getCurrentValue() & ~BLOCK_GROUP_INDEX_MASK) + MAX_BLOCKS_IN_GROUP); + // Make sure there's no conflict with existing random block IDs + final Block b = new Block(getCurrentValue()); + while (hasValidBlockInRange(b)) { + skipTo(getCurrentValue() + MAX_BLOCKS_IN_GROUP); + b.setBlockId(getCurrentValue()); + } + if (b.getBlockId() >= 0) { + throw new IllegalStateException("All negative block group IDs are used, " + + "growing into positive IDs, " + + "which might conflict with non-erasure coded blocks."); + } + return getCurrentValue(); + } + + /** + * @param b A block object whose id is set to the starting point for check + * @return true if any ID in the range - * {id, id+HdfsConstants.MAX_BLOCKS_IN_GROUP} is pointed-to by a file ++ * {id, id+HdfsConstants.MAX_BLOCKS_IN_GROUP} is pointed-to by a stored ++ * block. + */ + private boolean hasValidBlockInRange(Block b) { + final long id = b.getBlockId(); + for (int i = 0; i < MAX_BLOCKS_IN_GROUP; i++) { + b.setBlockId(id + i); - if (blockManager.getBlockCollection(b) != null) { ++ if (blockManager.getStoredBlock(b) != null) { + return true; + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java index 6074784,f053b7b..631b435 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SequentialBlockIdGenerator.java @@@ -19,6 -19,8 +19,7 @@@ package org.apache.hadoop.hdfs.server.b import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; + import org.apache.hadoop.hdfs.server.namenode.INodeId; import org.apache.hadoop.util.SequentialNumber; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 1b695e3,5bc50b0..82a0f62 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@@ -2158,11 -2103,14 +2128,11 @@@ public class DataNode extends Reconfigu // // Header info // - Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN; - if (isBlockTokenEnabled) { - accessToken = blockPoolTokenSecretManager.generateToken(b, - EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); - } + Token<BlockTokenIdentifier> accessToken = getBlockAccessToken(b, + EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE)); long writeTimeout = dnConf.socketWriteTimeout + - HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); + HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); DataEncryptionKeyFactory keyFactory = http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java index 22d821f,0000000..a0ac033 mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ErasureCodingZoneManager.java @@@ -1,163 -1,0 +1,163 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.hdfs.XAttrHelper; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.WritableUtils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; + +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_ERASURECODING_ZONE; + +/** + * Manages the list of erasure coding zones in the filesystem. + * <p/> + * The ErasureCodingZoneManager has its own lock, but relies on the FSDirectory + * lock being held for many operations. The FSDirectory lock should not be + * taken if the manager lock is already held. + * TODO: consolidate zone logic w/ encrypt. zones {@link EncryptionZoneManager} + */ +public class ErasureCodingZoneManager { + private final FSDirectory dir; + + /** + * Construct a new ErasureCodingZoneManager. + * + * @param dir Enclosing FSDirectory + */ + public ErasureCodingZoneManager(FSDirectory dir) { + this.dir = dir; + } + + ErasureCodingPolicy getErasureCodingPolicy(INodesInPath iip) throws IOException { + ErasureCodingZone ecZone = getErasureCodingZone(iip); + return ecZone == null ? null : ecZone.getErasureCodingPolicy(); + } + + ErasureCodingZone getErasureCodingZone(INodesInPath iip) throws IOException { + assert dir.hasReadLock(); + Preconditions.checkNotNull(iip, "INodes cannot be null"); + List<INode> inodes = iip.getReadOnlyINodes(); + for (int i = inodes.size() - 1; i >= 0; i--) { + final INode inode = inodes.get(i); + if (inode == null) { + continue; + } + // We don't allow symlinks in an EC zone, or pointing to a file/dir in + // an EC. Therefore if a symlink is encountered, the dir shouldn't have + // EC + // TODO: properly support symlinks in EC zones + if (inode.isSymlink()) { + return null; + } + final List<XAttr> xAttrs = inode.getXAttrFeature() == null ? + new ArrayList<XAttr>(0) + : inode.getXAttrFeature().getXAttrs(); + for (XAttr xAttr : xAttrs) { - if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixName(xAttr))) { ++ if (XATTR_ERASURECODING_ZONE.equals(XAttrHelper.getPrefixedName(xAttr))) { + ByteArrayInputStream bIn=new ByteArrayInputStream(xAttr.getValue()); + DataInputStream dIn=new DataInputStream(bIn); + String ecPolicyName = WritableUtils.readString(dIn); + ErasureCodingPolicy ecPolicy = dir.getFSNamesystem() + .getErasureCodingPolicyManager().getPolicy(ecPolicyName); + return new ErasureCodingZone(dir.getInode(inode.getId()) + .getFullPathName(), ecPolicy); + } + } + } + return null; + } + + List<XAttr> createErasureCodingZone(final INodesInPath srcIIP, + ErasureCodingPolicy ecPolicy) throws IOException { + assert dir.hasWriteLock(); + Preconditions.checkNotNull(srcIIP, "INodes cannot be null"); + String src = srcIIP.getPath(); + if (dir.isNonEmptyDirectory(srcIIP)) { + throw new IOException( + "Attempt to create an erasure coding zone for a " + + "non-empty directory " + src); + } + if (srcIIP.getLastINode() != null && + !srcIIP.getLastINode().isDirectory()) { + throw new IOException("Attempt to create an erasure coding zone " + + "for a file " + src); + } + if (getErasureCodingPolicy(srcIIP) != null) { + throw new IOException("Directory " + src + " is already in an " + + "erasure coding zone."); + } + + // System default erasure coding policy will be used since no specified. + if (ecPolicy == null) { + ecPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy(); + } + + final XAttr ecXAttr; + DataOutputStream dOut = null; + try { + ByteArrayOutputStream bOut = new ByteArrayOutputStream(); + dOut = new DataOutputStream(bOut); + WritableUtils.writeString(dOut, ecPolicy.getName()); + ecXAttr = XAttrHelper.buildXAttr(XATTR_ERASURECODING_ZONE, + bOut.toByteArray()); + } finally { + IOUtils.closeStream(dOut); + } + final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1); + xattrs.add(ecXAttr); + FSDirXAttrOp.unprotectedSetXAttrs(dir, src, xattrs, + EnumSet.of(XAttrSetFlag.CREATE)); + return xattrs; + } + + void checkMoveValidity(INodesInPath srcIIP, INodesInPath dstIIP, String src) + throws IOException { + assert dir.hasReadLock(); + final ErasureCodingZone srcZone = getErasureCodingZone(srcIIP); + final ErasureCodingZone dstZone = getErasureCodingZone(dstIIP); + if (srcZone != null && srcZone.getDir().equals(src) && dstZone == null) { + return; + } + final ErasureCodingPolicy srcECPolicy = + srcZone != null ? srcZone.getErasureCodingPolicy() : null; + final ErasureCodingPolicy dstECPolicy = + dstZone != null ? dstZone.getErasureCodingPolicy() : null; + if (srcECPolicy != null && !srcECPolicy.equals(dstECPolicy) || + dstECPolicy != null && !dstECPolicy.equals(srcECPolicy)) { + throw new IOException( + src + " can't be moved because the source and destination have " + + "different erasure coding policies."); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java index 9b08092,df0bc20..4bed13e --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java @@@ -122,7 -122,7 +122,7 @@@ public class FSDirAttrOp " does not exist."); } boolean changed = unprotectedSetTimes(fsd, inode, mtime, atime, true, -- iip.getLatestSnapshotId()); ++ iip.getLatestSnapshotId()); if (changed) { fsd.getEditLog().logTimes(src, mtime, atime); } @@@ -399,30 -397,26 +397,30 @@@ } static BlockInfo[] unprotectedSetReplication( - FSDirectory fsd, String src, short replication, short[] blockRepls) + FSDirectory fsd, String src, short replication) throws QuotaExceededException, UnresolvedLinkException, - SnapshotAccessControlException { + SnapshotAccessControlException, UnsupportedActionException { assert fsd.hasWriteLock(); + final BlockManager bm = fsd.getBlockManager(); final INodesInPath iip = fsd.getINodesInPath4Write(src, true); final INode inode = iip.getLastINode(); if (inode == null || !inode.isFile()) { return null; } INodeFile file = inode.asFile(); + if (file.isStriped()) { + throw new UnsupportedActionException( + "Cannot set replication to a file with striped blocks"); + } - final short oldBR = file.getPreferredBlockReplication(); + // Make sure the directory has sufficient quotas + short oldBR = file.getPreferredBlockReplication(); - // before setFileReplication, check for increasing block replication. - // if replication > oldBR, then newBR == replication. - // if replication < oldBR, we don't know newBR yet. - if (replication > oldBR) { - long dsDelta = file.storagespaceConsumed(null).getStorageSpace() / oldBR; - fsd.updateCount(iip, 0L, dsDelta, oldBR, replication, true); + // Ensure the quota does not exceed + if (oldBR < replication) { + long size = file.computeFileSize(true, true); + fsd.updateCount(iip, 0L, size, oldBR, replication, true); } file.setFileReplication(replication, iip.getLatestSnapshotId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirRenameOp.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 68aef76,e9d0806..e480959 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@@ -46,7 -44,6 +46,8 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; ++ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; @@@ -529,32 -516,15 +530,32 @@@ class FSDirWriteFileOp final INodeFile fileINode = inodesInPath.getLastINode().asFile(); Preconditions.checkState(fileINode.isUnderConstruction()); - // check quota limits and updated space consumed - fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), - fileINode.getFileReplication(), true); - // associate new last block for the file - BlockInfo blockInfo = new BlockInfoContiguous(block, - fileINode.getFileReplication()); - blockInfo.convertToBlockUnderConstruction( - HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); + final BlockInfo blockInfo; + if (isStriped) { + ErasureCodingZone ecZone = FSDirErasureCodingOp.getErasureCodingZone( + fsd.getFSNamesystem(), inodesInPath); + ErasureCodingPolicy ecPolicy = ecZone.getErasureCodingPolicy(); + short numDataUnits = (short) ecPolicy.getNumDataUnits(); + short numParityUnits = (short) ecPolicy.getNumParityUnits(); + short numLocations = (short) (numDataUnits + numParityUnits); + + // check quota limits and updated space consumed + fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), + numLocations, true); + blockInfo = new BlockInfoStriped(block, ecPolicy); + blockInfo.convertToBlockUnderConstruction( + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); + } else { + // check quota limits and updated space consumed + fsd.updateCount(inodesInPath, 0, fileINode.getPreferredBlockSize(), - fileINode.getPreferredBlockReplication(), true); ++ fileINode.getFileReplication(), true); + + short numLocations = fileINode.getFileReplication(); + blockInfo = new BlockInfoContiguous(block, numLocations); + blockInfo.convertToBlockUnderConstruction( + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); + } fsd.getBlockManager().addBlockCollection(blockInfo, fileINode); fileINode.addBlock(blockInfo); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index a61161f,f22762c..7203316 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@@ -1077,29 -1048,19 +1077,29 @@@ public class FSEditLogLoader // TODO: shouldn't this only be true for the last block? // what about an old-version fsync() where fsync isn't called // until several blocks in? - newBI = new BlockInfoContiguous(newBlock, - file.getPreferredBlockReplication()); + if (isStriped) { + newBI = new BlockInfoStriped(newBlock, + ecZone.getErasureCodingPolicy()); + } else { + newBI = new BlockInfoContiguous(newBlock, + file.getPreferredBlockReplication()); + } - newBI.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, - null); + newBI.convertToBlockUnderConstruction( - HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null); ++ BlockUCState.UNDER_CONSTRUCTION, null); } else { // OP_CLOSE should add finalized blocks. This code path // is only executed when loading edits written by prior // versions of Hadoop. Current versions always log // OP_ADD operations as each block is allocated. - newBI = new BlockInfoContiguous(newBlock, - file.getFileReplication()); + if (isStriped) { + newBI = new BlockInfoStriped(newBlock, + ErasureCodingPolicyManager.getSystemDefaultPolicy()); + } else { + newBI = new BlockInfoContiguous(newBlock, - file.getPreferredBlockReplication()); ++ file.getFileReplication()); + } } - fsNamesys.getBlockManager().addBlockCollection(newBI, file); + fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBI, file); file.addBlock(newBI); fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index ffaf86b,ac88919..a115138 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@@ -43,9 -42,9 +43,10 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; + import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 0d9d427,f4952f7..5f39446 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@@ -7423,11 -7287,30 +7435,35 @@@ public class FSNamesystem implements Na logger.addAppender(asyncAppender); } } - + /** + * Return total number of Sync Operations on FSEditLog. + */ + @Override + @Metric({"TotalSyncCount", + "Total number of sync operations performed on edit logs"}) + public long getTotalSyncCount() { + return fsImage.editLog.getTotalSyncCount(); + } + + /** + * Return total time spent doing sync operations on FSEditLog. + */ + @Override + @Metric({"TotalSyncTimes", + "Total time spend in sync operation on various edit logs"}) + public String getTotalSyncTimes() { + JournalSet journalSet = fsImage.editLog.getJournalSet(); + if (journalSet != null) { + return journalSet.getSyncTimes(); + } else { + return ""; + } + } + + @Override + public ErasureCodingZone getErasureCodingZoneForPath(String src) + throws IOException { + return FSDirErasureCodingOp.getErasureCodingZone(this, src); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 4fa457d,d546905..ae9b0d2 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@@ -38,7 -37,7 +38,8 @@@ import org.apache.hadoop.hdfs.protocol. import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; + import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 9d43c15,7ebe859..e3363a4 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@@ -258,8 -254,7 +258,8 @@@ public class NamenodeFsck implements Da NumberReplicas numberReplicas= bm.countNodes(blockInfo); out.println("Block Id: " + blockId); out.println("Block belongs to: "+iNode.getFullPathName()); - out.println("No. of Expected Replica: " + blockInfo.getReplication()); + out.println("No. of Expected Replica: " + - bm.getExpectedReplicaNum(bc, blockInfo)); ++ bm.getExpectedReplicaNum(blockInfo)); out.println("No. of live Replica: " + numberReplicas.liveReplicas()); out.println("No. of excess Replica: " + numberReplicas.excessReplicas()); out.println("No. of stale Replica: " + http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java index bae033b,4a208d8..923a335 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java @@@ -17,11 -17,9 +17,12 @@@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; + import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.util.RwLock; @@@ -48,17 -48,7 +51,17 @@@ public interface Namesystem extends RwL void checkOperation(OperationCategory read) throws StandbyException; - boolean isInSnapshot(BlockCollection bc); - + /** + * Gets the ECZone for path + * @param src + * - path + * @return {@link ErasureCodingZone} + * @throws IOException + */ + ErasureCodingZone getErasureCodingZoneForPath(String src) + throws IOException; + + boolean isInSnapshot(BlockInfo blockUC); + CacheManager getCacheManager(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java index 0162f85,91ebaaf..450d981 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FSImageFormatPBSnapshot.java @@@ -245,11 -245,10 +247,11 @@@ public class FSImageFormatPBSnapshot BlockInfo[] blocks = new BlockInfo[bpl.size()]; for(int j = 0, e = bpl.size(); j < e; ++j) { Block blk = PBHelper.convert(bpl.get(j)); - BlockInfo storedBlock = fsn.getBlockManager().getStoredBlock(blk); + BlockInfo storedBlock = bm.getStoredBlock(blk); if(storedBlock == null) { - storedBlock = bm.addBlockCollection( - new BlockInfoContiguous(blk, copy.getFileReplication()), file); + storedBlock = (BlockInfoContiguous) fsn.getBlockManager() + .addBlockCollectionWithCheck(new BlockInfoContiguous(blk, + copy.getFileReplication()), file); } blocks[j] = storedBlock; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index fcf2bc1,96776e4..0db56dd --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@@ -2413,24 -2403,13 +2403,34 @@@ </property> <property> + <name>dfs.datanode.stripedread.threshold.millis</name> + <value>5000</value> + <description>datanode striped read threshold in millisecond. + </description> +</property> + +<property> + <name>dfs.datanode.stripedread.threads</name> + <value>20</value> + <description>datanode striped read thread pool size. + </description> +</property> + +<property> + <name>dfs.datanode.stripedread.buffer.size</name> + <value>262144</value> + <description>datanode striped read buffer size. + </description> +</property> + ++<property> + <name>dfs.namenode.quota.init-threads</name> + <value>4</value> + <description> + The number of concurrent threads to be used in quota initialization. The + speed of quota initialization also affects the namenode fail-over latency. + If the size of name space is big, try increasing this. + </description> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 59daba4,24e0965..4bb5c64 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@@ -119,8 -119,8 +119,9 @@@ import org.apache.hadoop.security.UserG import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ExitUtil; + import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ToolRunner; import com.google.common.base.Joiner; http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index ad8f204,c1ed758..eb24fb0 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@@ -955,9 -939,9 +958,9 @@@ public class TestBalancer void testBalancer1Internal(Configuration conf) throws Exception { initConf(conf); testUnevenDistribution(conf, -- new long[] {50*CAPACITY/100, 10*CAPACITY/100}, ++ new long[]{50 * CAPACITY / 100, 10 * CAPACITY / 100}, new long[]{CAPACITY, CAPACITY}, -- new String[] {RACK0, RACK1}); ++ new String[]{RACK0, RACK1}); } @Test(expected=HadoopIllegalArgumentException.class) @@@ -971,7 -955,7 +974,7 @@@ public void testBalancerWithNonZeroThreadsForMove() throws Exception { Configuration conf = new HdfsConfiguration(); conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 8); -- testBalancer1Internal (conf); ++ testBalancer1Internal(conf); } @Test(timeout=100000) @@@ -981,8 -965,8 +984,8 @@@ void testBalancer2Internal(Configuration conf) throws Exception { initConf(conf); -- testBalancerDefaultConstructor(conf, new long[] { CAPACITY, CAPACITY }, -- new String[] { RACK0, RACK1 }, CAPACITY, RACK2); ++ testBalancerDefaultConstructor(conf, new long[]{CAPACITY, CAPACITY}, ++ new String[]{RACK0, RACK1}, CAPACITY, RACK2); } private void testBalancerDefaultConstructor(Configuration conf, @@@ -1555,75 -1540,116 +1559,183 @@@ } } + /** Balancer should not move blocks with size < minBlockSize. */ + @Test(timeout=60000) + public void testMinBlockSizeAndSourceNodes() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + + final short replication = 3; + final long[] lengths = {10, 10, 10, 10}; + final long[] capacities = new long[replication]; + final long totalUsed = capacities.length * sum(lengths); + Arrays.fill(capacities, 1000); + + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(capacities.length) + .simulatedCapacities(capacities) + .build(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + + try { + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, dfs.getUri(), + ClientProtocol.class).getProxy(); + + // fill up the cluster to be 80% full + for(int i = 0; i < lengths.length; i++) { + final long size = lengths[i]; + final Path p = new Path("/file" + i + "_size" + size); + try(final OutputStream out = dfs.create(p)) { + for(int j = 0; j < size; j++) { + out.write(j); + } + } + } + + // start up an empty node with the same capacity + cluster.startDataNodes(conf, capacities.length, true, null, null, capacities); + LOG.info("capacities = " + Arrays.toString(capacities)); + LOG.info("totalUsedSpace= " + totalUsed); + LOG.info("lengths = " + Arrays.toString(lengths) + ", #=" + lengths.length); + waitForHeartBeat(totalUsed, 2*capacities[0]*capacities.length, client, cluster); + + final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); + + { // run Balancer with min-block-size=50 + final Parameters p = new Parameters( + BalancingPolicy.Node.INSTANCE, 1, + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, + Collections.<String> emptySet(), Collections.<String> emptySet(), + Collections.<String> emptySet(), false); + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); + final int r = Balancer.run(namenodes, p, conf); + assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); + } + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); + + { // run Balancer with empty nodes as source nodes + final Set<String> sourceNodes = new HashSet<>(); + final List<DataNode> datanodes = cluster.getDataNodes(); + for(int i = capacities.length; i < datanodes.size(); i++) { + sourceNodes.add(datanodes.get(i).getDisplayName()); + } + final Parameters p = new Parameters( + BalancingPolicy.Node.INSTANCE, 1, + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, + Collections.<String> emptySet(), Collections.<String> emptySet(), + sourceNodes, false); + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); + final int r = Balancer.run(namenodes, p, conf); + assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r); + } + + { // run Balancer with a filled node as a source node + final Set<String> sourceNodes = new HashSet<>(); + final List<DataNode> datanodes = cluster.getDataNodes(); + sourceNodes.add(datanodes.get(0).getDisplayName()); + final Parameters p = new Parameters( + BalancingPolicy.Node.INSTANCE, 1, + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, + Collections.<String> emptySet(), Collections.<String> emptySet(), + sourceNodes, false); + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); + final int r = Balancer.run(namenodes, p, conf); + assertEquals(ExitStatus.NO_MOVE_BLOCK.getExitCode(), r); + } + + { // run Balancer with all filled node as source nodes + final Set<String> sourceNodes = new HashSet<>(); + final List<DataNode> datanodes = cluster.getDataNodes(); + for(int i = 0; i < capacities.length; i++) { + sourceNodes.add(datanodes.get(i).getDisplayName()); + } + final Parameters p = new Parameters( + BalancingPolicy.Node.INSTANCE, 1, + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, + Collections.<String> emptySet(), Collections.<String> emptySet(), + sourceNodes, false); + + conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); + final int r = Balancer.run(namenodes, p, conf); + assertEquals(ExitStatus.SUCCESS.getExitCode(), r); + } + } finally { + cluster.shutdown(); + } + } - + public void integrationTestWithStripedFile(Configuration conf) throws Exception { + initConfWithStripe(conf); + doTestBalancerWithStripedFile(conf); + } + + @Test(timeout = 100000) + public void testBalancerWithStripedFile() throws Exception { + Configuration conf = new Configuration(); + initConfWithStripe(conf); + doTestBalancerWithStripedFile(conf); + } + + private void doTestBalancerWithStripedFile(Configuration conf) throws Exception { + int numOfDatanodes = dataBlocks + parityBlocks + 2; + int numOfRacks = dataBlocks; + long capacity = 20 * DEFAULT_STRIPE_BLOCK_SIZE; + long[] capacities = new long[numOfDatanodes]; + for (int i = 0; i < capacities.length; i++) { + capacities[i] = capacity; + } + String[] racks = new String[numOfDatanodes]; + for (int i = 0; i < numOfDatanodes; i++) { + racks[i] = "/rack" + (i % numOfRacks); + } + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numOfDatanodes) + .racks(racks) + .simulatedCapacities(capacities) + .build(); + + try { + cluster.waitActive(); + client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), + ClientProtocol.class).getProxy(); + client.createErasureCodingZone("/", null); + + long totalCapacity = sum(capacities); + + // fill up the cluster with 30% data. It'll be 45% full plus parity. + long fileLen = totalCapacity * 3 / 10; + long totalUsedSpace = fileLen * (dataBlocks + parityBlocks) / dataBlocks; + FileSystem fs = cluster.getFileSystem(0); + DFSTestUtil.createFile(fs, filePath, fileLen, (short) 3, r.nextLong()); + + // verify locations of striped blocks + LocatedBlocks locatedBlocks = client.getBlockLocations(fileName, 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); + + // add one datanode + String newRack = "/rack" + (++numOfRacks); + cluster.startDataNodes(conf, 1, true, null, + new String[]{newRack}, null, new long[]{capacity}); + totalCapacity += capacity; + cluster.triggerHeartbeats(); + + // run balancer and validate results + Balancer.Parameters p = Balancer.Parameters.DEFAULT; + Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); + runBalancer(conf, totalUsedSpace, totalCapacity, p, 0); + + // verify locations of striped blocks + locatedBlocks = client.getBlockLocations(fileName, 0, fileLen); + DFSTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize); - + } finally { + cluster.shutdown(); + } + } + /** * @param args */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab56fcdb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java index bae4f1d,ceef9f2..d6213ff --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java @@@ -70,18 -71,18 +71,6 @@@ public class TestBlockInfo } @Test -- public void testCopyConstructor() { - BlockInfo old = new BlockInfoContiguous((short) 3); - BlockInfoContiguous old = new BlockInfoContiguous((short) 3); -- try { - BlockInfo copy = new BlockInfoContiguous((BlockInfoContiguous)old); - assertEquals(old.getBlockCollection(), copy.getBlockCollection()); - BlockInfoContiguous copy = new BlockInfoContiguous(old); - assertEquals(old.getBlockCollectionId(), copy.getBlockCollectionId()); -- assertEquals(old.getCapacity(), copy.getCapacity()); -- } catch (Exception e) { -- Assert.fail("Copy constructor throws exception: " + e); -- } -- } -- -- @Test public void testReplaceStorage() throws Exception { // Create two dummy storages.