HDFS-8801. Convert BlockInfoUnderConstruction as a feature. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e535e0f0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e535e0f0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e535e0f0 Branch: refs/heads/YARN-1197 Commit: e535e0f05b5fbd087c93238deb888cc985254b4c Parents: 456e901 Author: Haohui Mai <[email protected]> Authored: Mon Aug 17 11:28:28 2015 -0700 Committer: Haohui Mai <[email protected]> Committed: Mon Aug 17 11:28:28 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockCollection.java | 3 +- .../hdfs/server/blockmanagement/BlockInfo.java | 134 ++++-- .../BlockInfoContiguousUnderConstruction.java | 403 ------------------- .../server/blockmanagement/BlockManager.java | 99 ++--- .../BlockUnderConstructionFeature.java | 335 +++++++++++++++ .../blockmanagement/DatanodeDescriptor.java | 11 +- .../server/blockmanagement/DatanodeManager.java | 19 +- .../hdfs/server/namenode/FSDirTruncateOp.java | 39 +- .../hdfs/server/namenode/FSDirWriteFileOp.java | 24 +- .../hdfs/server/namenode/FSEditLogLoader.java | 25 +- .../hdfs/server/namenode/FSImageFormat.java | 5 +- .../server/namenode/FSImageFormatPBINode.java | 7 +- .../server/namenode/FSImageSerialization.java | 10 +- .../hdfs/server/namenode/FSNamesystem.java | 50 +-- .../namenode/FileUnderConstructionFeature.java | 8 +- .../hadoop/hdfs/server/namenode/INodeFile.java | 29 +- .../hadoop/hdfs/server/namenode/Namesystem.java | 4 +- .../server/namenode/snapshot/FileDiffList.java | 3 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 9 +- .../TestBlockInfoUnderConstruction.java | 18 +- .../blockmanagement/TestBlockManager.java | 9 +- .../blockmanagement/TestHeartbeatHandling.java | 22 +- .../blockmanagement/TestReplicationPolicy.java | 13 +- .../namenode/TestBlockUnderConstruction.java | 6 +- .../TestCommitBlockSynchronization.java | 9 +- .../hdfs/server/namenode/TestFileTruncate.java | 5 +- .../namenode/ha/TestRetryCacheWithHA.java | 11 +- .../namenode/snapshot/SnapshotTestHelper.java | 5 +- 29 files changed, 642 insertions(+), 676 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c12e678..5866237 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -796,6 +796,9 @@ Release 2.8.0 - UNRELEASED HDFS-6407. Add sorting and pagination in the datanode tab of the NN Web UI. (wheat9) + HDFS-8801. Convert BlockInfoUnderConstruction as a feature. + (Jing Zhao via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java index 02a1d05..a3b4401 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java @@ -79,8 +79,7 @@ public interface BlockCollection { * Convert the last block of the collection to an under-construction block * and set the locations. */ - public BlockInfoContiguousUnderConstruction setLastBlock(BlockInfo lastBlock, - DatanodeStorageInfo[] targets) throws IOException; + public void convertLastBlockToUC(BlockInfo lastBlock, DatanodeStorageInfo[] targets) throws IOException; /** * @return whether the block collection is under construction. http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index dea31c4..94dac35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -17,11 +17,16 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import java.io.IOException; import java.util.LinkedList; +import java.util.List; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature.ReplicaUnderConstruction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.util.LightWeightGSet; /** @@ -32,6 +37,7 @@ import org.apache.hadoop.util.LightWeightGSet; @InterfaceAudience.Private public abstract class BlockInfo extends Block implements LightWeightGSet.LinkedElement { + public static final BlockInfo[] EMPTY_ARRAY = {}; private BlockCollection bc; @@ -53,6 +59,8 @@ public abstract class BlockInfo extends Block */ protected Object[] triplets; + private BlockUnderConstructionFeature uc; + /** * Construct an entry for blocksmap * @param replication the block's replication factor @@ -181,7 +189,6 @@ public abstract class BlockInfo extends Block */ abstract boolean removeStorage(DatanodeStorageInfo storage); - /** * Replace the current BlockInfo with the new one in corresponding * DatanodeStorageInfo's linked list @@ -292,14 +299,36 @@ public abstract class BlockInfo extends Block return this; } - /** - * BlockInfo represents a block that is not being constructed. - * In order to start modifying the block, the BlockInfo should be converted - * to {@link BlockInfoContiguousUnderConstruction}. - * @return {@link BlockUCState#COMPLETE} - */ + @Override + public int hashCode() { + // Super implementation is sufficient + return super.hashCode(); + } + + @Override + public boolean equals(Object obj) { + // Sufficient to rely on super's implementation + return (this == obj) || super.equals(obj); + } + + @Override + public LightWeightGSet.LinkedElement getNext() { + return nextLinkedElement; + } + + @Override + public void setNext(LightWeightGSet.LinkedElement next) { + this.nextLinkedElement = next; + } + + /* UnderConstruction Feature related */ + + public BlockUnderConstructionFeature getUnderConstructionFeature() { + return uc; + } + public BlockUCState getBlockUCState() { - return BlockUCState.COMPLETE; + return uc == null ? BlockUCState.COMPLETE : uc.getBlockUCState(); } /** @@ -312,46 +341,69 @@ public abstract class BlockInfo extends Block } /** - * Convert a complete block to an under construction block. - * @return BlockInfoUnderConstruction - an under construction block. + * Add/Update the under construction feature. */ - public BlockInfoContiguousUnderConstruction convertToBlockUnderConstruction( - BlockUCState s, DatanodeStorageInfo[] targets) { - if(isComplete()) { - BlockInfoContiguousUnderConstruction ucBlock = - new BlockInfoContiguousUnderConstruction(this, - getBlockCollection().getPreferredBlockReplication(), s, targets); - ucBlock.setBlockCollection(getBlockCollection()); - return ucBlock; + public void convertToBlockUnderConstruction(BlockUCState s, + DatanodeStorageInfo[] targets) { + if (isComplete()) { + uc = new BlockUnderConstructionFeature(this, s, targets); + } else { + // the block is already under construction + uc.setBlockUCState(s); + uc.setExpectedLocations(this.getGenerationStamp(), targets); } - // the block is already under construction - BlockInfoContiguousUnderConstruction ucBlock = - (BlockInfoContiguousUnderConstruction)this; - ucBlock.setBlockUCState(s); - ucBlock.setExpectedLocations(targets); - ucBlock.setBlockCollection(getBlockCollection()); - return ucBlock; - } - - @Override - public int hashCode() { - // Super implementation is sufficient - return super.hashCode(); } - @Override - public boolean equals(Object obj) { - // Sufficient to rely on super's implementation - return (this == obj) || super.equals(obj); + /** + * Convert an under construction block to a complete block. + * + * @return BlockInfo - a complete block. + * @throws IOException if the state of the block + * (the generation stamp and the length) has not been committed by + * the client or it does not have at least a minimal number of replicas + * reported from data-nodes. + */ + BlockInfo convertToCompleteBlock() throws IOException { + assert getBlockUCState() != BlockUCState.COMPLETE : + "Trying to convert a COMPLETE block"; + uc = null; + return this; } - @Override - public LightWeightGSet.LinkedElement getNext() { - return nextLinkedElement; + /** + * Process the recorded replicas. When about to commit or finish the + * pipeline recovery sort out bad replicas. + * @param genStamp The final generation stamp for the block. + */ + public void setGenerationStampAndVerifyReplicas(long genStamp) { + Preconditions.checkState(uc != null && !isComplete()); + // Set the generation stamp for the block. + setGenerationStamp(genStamp); + + // Remove the replicas with wrong gen stamp + List<ReplicaUnderConstruction> staleReplicas = uc.getStaleReplicas(genStamp); + for (ReplicaUnderConstruction r : staleReplicas) { + r.getExpectedStorageLocation().removeBlock(this); + NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica " + + "from location: {}", r.getExpectedStorageLocation()); + } } - @Override - public void setNext(LightWeightGSet.LinkedElement next) { - this.nextLinkedElement = next; + /** + * Commit block's length and generation stamp as reported by the client. + * Set block state to {@link BlockUCState#COMMITTED}. + * @param block - contains client reported block length and generation + * @throws IOException if block ids are inconsistent. + */ + void commitBlock(Block block) throws IOException { + if (getBlockId() != block.getBlockId()) { + throw new IOException("Trying to commit inconsistent block: id = " + + block.getBlockId() + ", expected id = " + getBlockId()); + } + Preconditions.checkState(!isComplete()); + uc.commit(); + this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp()); + // Sort out invalid replicas. + setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java deleted file mode 100644 index 7ca6419..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java +++ /dev/null @@ -1,403 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.namenode.NameNode; - -/** - * Represents a block that is currently being constructed.<br> - * This is usually the last block of a file opened for write or append. - */ -public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { - /** Block state. See {@link BlockUCState} */ - private BlockUCState blockUCState; - - /** - * Block replicas as assigned when the block was allocated. - * This defines the pipeline order. - */ - private List<ReplicaUnderConstruction> replicas; - - /** - * Index of the primary data node doing the recovery. Useful for log - * messages. - */ - private int primaryNodeIndex = -1; - - /** - * The new generation stamp, which this block will have - * after the recovery succeeds. Also used as a recovery id to identify - * the right recovery if any of the abandoned recoveries re-appear. - */ - private long blockRecoveryId = 0; - - /** - * The block source to use in the event of copy-on-write truncate. - */ - private Block truncateBlock; - - /** - * ReplicaUnderConstruction contains information about replicas while - * they are under construction. - * The GS, the length and the state of the replica is as reported by - * the data-node. - * It is not guaranteed, but expected, that data-nodes actually have - * corresponding replicas. - */ - static class ReplicaUnderConstruction extends Block { - private final DatanodeStorageInfo expectedLocation; - private ReplicaState state; - private boolean chosenAsPrimary; - - ReplicaUnderConstruction(Block block, - DatanodeStorageInfo target, - ReplicaState state) { - super(block); - this.expectedLocation = target; - this.state = state; - this.chosenAsPrimary = false; - } - - /** - * Expected block replica location as assigned when the block was allocated. - * This defines the pipeline order. - * It is not guaranteed, but expected, that the data-node actually has - * the replica. - */ - private DatanodeStorageInfo getExpectedStorageLocation() { - return expectedLocation; - } - - /** - * Get replica state as reported by the data-node. - */ - ReplicaState getState() { - return state; - } - - /** - * Whether the replica was chosen for recovery. - */ - boolean getChosenAsPrimary() { - return chosenAsPrimary; - } - - /** - * Set replica state. - */ - void setState(ReplicaState s) { - state = s; - } - - /** - * Set whether this replica was chosen for recovery. - */ - void setChosenAsPrimary(boolean chosenAsPrimary) { - this.chosenAsPrimary = chosenAsPrimary; - } - - /** - * Is data-node the replica belongs to alive. - */ - boolean isAlive() { - return expectedLocation.getDatanodeDescriptor().isAlive; - } - - @Override // Block - public int hashCode() { - return super.hashCode(); - } - - @Override // Block - public boolean equals(Object obj) { - // Sufficient to rely on super's implementation - return (this == obj) || super.equals(obj); - } - - @Override - public String toString() { - final StringBuilder b = new StringBuilder(50); - appendStringTo(b); - return b.toString(); - } - - @Override - public void appendStringTo(StringBuilder sb) { - sb.append("ReplicaUC[") - .append(expectedLocation) - .append("|") - .append(state) - .append("]"); - } - } - - /** - * Create block and set its state to - * {@link BlockUCState#UNDER_CONSTRUCTION}. - */ - public BlockInfoContiguousUnderConstruction(Block blk, short replication) { - this(blk, replication, BlockUCState.UNDER_CONSTRUCTION, null); - } - - /** - * Create a block that is currently being constructed. - */ - public BlockInfoContiguousUnderConstruction(Block blk, short replication, - BlockUCState state, DatanodeStorageInfo[] targets) { - super(blk, replication); - assert getBlockUCState() != BlockUCState.COMPLETE : - "BlockInfoUnderConstruction cannot be in COMPLETE state"; - this.blockUCState = state; - setExpectedLocations(targets); - } - - /** - * Convert an under construction block to a complete block. - * - * @return BlockInfo - a complete block. - * @throws IOException if the state of the block - * (the generation stamp and the length) has not been committed by - * the client or it does not have at least a minimal number of replicas - * reported from data-nodes. - */ - BlockInfo convertToCompleteBlock() throws IOException { - assert getBlockUCState() != BlockUCState.COMPLETE : - "Trying to convert a COMPLETE block"; - return new BlockInfoContiguous(this); - } - - /** Set expected locations */ - public void setExpectedLocations(DatanodeStorageInfo[] targets) { - int numLocations = targets == null ? 0 : targets.length; - this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations); - for(int i = 0; i < numLocations; i++) - replicas.add( - new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW)); - } - - /** - * Create array of expected replica locations - * (as has been assigned by chooseTargets()). - */ - public DatanodeStorageInfo[] getExpectedStorageLocations() { - int numLocations = replicas == null ? 0 : replicas.size(); - DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; - for(int i = 0; i < numLocations; i++) - storages[i] = replicas.get(i).getExpectedStorageLocation(); - return storages; - } - - /** Get the number of expected locations */ - public int getNumExpectedLocations() { - return replicas == null ? 0 : replicas.size(); - } - - /** - * Return the state of the block under construction. - * @see BlockUCState - */ - @Override // BlockInfo - public BlockUCState getBlockUCState() { - return blockUCState; - } - - void setBlockUCState(BlockUCState s) { - blockUCState = s; - } - - /** Get block recovery ID */ - public long getBlockRecoveryId() { - return blockRecoveryId; - } - - /** Get recover block */ - public Block getTruncateBlock() { - return truncateBlock; - } - - public void setTruncateBlock(Block recoveryBlock) { - this.truncateBlock = recoveryBlock; - } - - /** - * Process the recorded replicas. When about to commit or finish the - * pipeline recovery sort out bad replicas. - * @param genStamp The final generation stamp for the block. - */ - public void setGenerationStampAndVerifyReplicas(long genStamp) { - // Set the generation stamp for the block. - setGenerationStamp(genStamp); - if (replicas == null) - return; - - // Remove the replicas with wrong gen stamp. - // The replica list is unchanged. - for (ReplicaUnderConstruction r : replicas) { - if (genStamp != r.getGenerationStamp()) { - r.getExpectedStorageLocation().removeBlock(this); - NameNode.blockStateChangeLog.debug("BLOCK* Removing stale replica " - + "from location: {}", r.getExpectedStorageLocation()); - } - } - } - - /** - * Commit block's length and generation stamp as reported by the client. - * Set block state to {@link BlockUCState#COMMITTED}. - * @param block - contains client reported block length and generation - * @throws IOException if block ids are inconsistent. - */ - void commitBlock(Block block) throws IOException { - if(getBlockId() != block.getBlockId()) - throw new IOException("Trying to commit inconsistent block: id = " - + block.getBlockId() + ", expected id = " + getBlockId()); - blockUCState = BlockUCState.COMMITTED; - this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp()); - // Sort out invalid replicas. - setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); - } - - /** - * Initialize lease recovery for this block. - * Find the first alive data-node starting from the previous primary and - * make it primary. - */ - public void initializeBlockRecovery(long recoveryId) { - setBlockUCState(BlockUCState.UNDER_RECOVERY); - blockRecoveryId = recoveryId; - if (replicas.size() == 0) { - NameNode.blockStateChangeLog.warn("BLOCK*" - + " BlockInfoUnderConstruction.initLeaseRecovery:" - + " No blocks found, lease removed."); - } - boolean allLiveReplicasTriedAsPrimary = true; - for (int i = 0; i < replicas.size(); i++) { - // Check if all replicas have been tried or not. - if (replicas.get(i).isAlive()) { - allLiveReplicasTriedAsPrimary = - (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary()); - } - } - if (allLiveReplicasTriedAsPrimary) { - // Just set all the replicas to be chosen whether they are alive or not. - for (int i = 0; i < replicas.size(); i++) { - replicas.get(i).setChosenAsPrimary(false); - } - } - long mostRecentLastUpdate = 0; - ReplicaUnderConstruction primary = null; - primaryNodeIndex = -1; - for(int i = 0; i < replicas.size(); i++) { - // Skip alive replicas which have been chosen for recovery. - if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) { - continue; - } - final ReplicaUnderConstruction ruc = replicas.get(i); - final long lastUpdate = ruc.getExpectedStorageLocation() - .getDatanodeDescriptor().getLastUpdateMonotonic(); - if (lastUpdate > mostRecentLastUpdate) { - primaryNodeIndex = i; - primary = ruc; - mostRecentLastUpdate = lastUpdate; - } - } - if (primary != null) { - primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this); - primary.setChosenAsPrimary(true); - NameNode.blockStateChangeLog.debug( - "BLOCK* {} recovery started, primary={}", this, primary); - } - } - - void addReplicaIfNotPresent(DatanodeStorageInfo storage, - Block block, - ReplicaState rState) { - Iterator<ReplicaUnderConstruction> it = replicas.iterator(); - while (it.hasNext()) { - ReplicaUnderConstruction r = it.next(); - DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation(); - if(expectedLocation == storage) { - // Record the gen stamp from the report - r.setGenerationStamp(block.getGenerationStamp()); - return; - } else if (expectedLocation != null && - expectedLocation.getDatanodeDescriptor() == - storage.getDatanodeDescriptor()) { - - // The Datanode reported that the block is on a different storage - // than the one chosen by BlockPlacementPolicy. This can occur as - // we allow Datanodes to choose the target storage. Update our - // state by removing the stale entry and adding a new one. - it.remove(); - break; - } - } - replicas.add(new ReplicaUnderConstruction(block, storage, rState)); - } - - @Override // BlockInfo - // BlockInfoUnderConstruction participates in maps the same way as BlockInfo - public int hashCode() { - return super.hashCode(); - } - - @Override // BlockInfo - public boolean equals(Object obj) { - // Sufficient to rely on super's implementation - return (this == obj) || super.equals(obj); - } - - @Override - public String toString() { - final StringBuilder b = new StringBuilder(100); - appendStringTo(b); - return b.toString(); - } - - @Override - public void appendStringTo(StringBuilder sb) { - super.appendStringTo(sb); - appendUCParts(sb); - } - - private void appendUCParts(StringBuilder sb) { - sb.append("{UCState=").append(blockUCState) - .append(", truncateBlock=" + truncateBlock) - .append(", primaryNodeIndex=").append(primaryNodeIndex) - .append(", replicas=["); - if (replicas != null) { - Iterator<ReplicaUnderConstruction> iter = replicas.iterator(); - if (iter.hasNext()) { - iter.next().appendStringTo(sb); - while (iter.hasNext()) { - sb.append(", "); - iter.next().appendStringTo(sb); - } - } - } - sb.append("]}"); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 508da85..fa51491 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.util.ExitUtil.terminate; import java.io.IOException; @@ -617,9 +616,8 @@ public class BlockManager implements BlockStatsMXBean { * @throws IOException if the block does not have at least a minimal number * of replicas reported from data-nodes. */ - private static boolean commitBlock( - final BlockInfoContiguousUnderConstruction block, final Block commitBlock) - throws IOException { + private static boolean commitBlock(final BlockInfo block, + final Block commitBlock) throws IOException { if (block.getBlockUCState() == BlockUCState.COMMITTED) return false; assert block.getNumBytes() <= commitBlock.getNumBytes() : @@ -649,8 +647,7 @@ public class BlockManager implements BlockStatsMXBean { if(lastBlock.isComplete()) return false; // already completed (e.g. by syncBlock) - final boolean b = commitBlock( - (BlockInfoContiguousUnderConstruction) lastBlock, commitBlock); + final boolean b = commitBlock(lastBlock, commitBlock); if(countNodes(lastBlock).liveReplicas() >= minReplication) completeBlock(bc, bc.numBlocks()-1, false); return b; @@ -670,16 +667,15 @@ public class BlockManager implements BlockStatsMXBean { BlockInfo curBlock = bc.getBlocks()[blkIndex]; if(curBlock.isComplete()) return curBlock; - BlockInfoContiguousUnderConstruction ucBlock = - (BlockInfoContiguousUnderConstruction) curBlock; - int numNodes = ucBlock.numNodes(); + + int numNodes = curBlock.numNodes(); if (!force && numNodes < minReplication) throw new IOException("Cannot complete block: " + "block does not satisfy minimal replication requirement."); - if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED) + if(!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED) throw new IOException( "Cannot complete block: block has not been COMMITTED by the client"); - BlockInfo completeBlock = ucBlock.convertToCompleteBlock(); + BlockInfo completeBlock = curBlock.convertToCompleteBlock(); // replace penultimate block in file bc.setBlock(blkIndex, completeBlock); @@ -713,7 +709,7 @@ public class BlockManager implements BlockStatsMXBean { * when tailing edit logs as a Standby. */ public BlockInfo forceCompleteBlock(final BlockCollection bc, - final BlockInfoContiguousUnderConstruction block) throws IOException { + final BlockInfo block) throws IOException { block.commitBlock(block); return completeBlock(bc, block, true); } @@ -735,28 +731,29 @@ public class BlockManager implements BlockStatsMXBean { */ public LocatedBlock convertLastBlockToUnderConstruction( BlockCollection bc, long bytesToRemove) throws IOException { - BlockInfo oldBlock = bc.getLastBlock(); - if(oldBlock == null || - bc.getPreferredBlockSize() == oldBlock.getNumBytes() - bytesToRemove) + BlockInfo lastBlock = bc.getLastBlock(); + if (lastBlock == null || + bc.getPreferredBlockSize() == lastBlock.getNumBytes() - bytesToRemove) { return null; - assert oldBlock == getStoredBlock(oldBlock) : + } + assert lastBlock == getStoredBlock(lastBlock) : "last block of the file is not in blocksMap"; - DatanodeStorageInfo[] targets = getStorages(oldBlock); + DatanodeStorageInfo[] targets = getStorages(lastBlock); - BlockInfoContiguousUnderConstruction ucBlock = - bc.setLastBlock(oldBlock, targets); - blocksMap.replaceBlock(ucBlock); + // convert the last block to under construction. note no block replacement + // is happening + bc.convertLastBlockToUC(lastBlock, targets); // Remove block from replication queue. - NumberReplicas replicas = countNodes(ucBlock); - neededReplications.remove(ucBlock, replicas.liveReplicas(), - replicas.decommissionedAndDecommissioning(), getReplication(ucBlock)); - pendingReplications.remove(ucBlock); + NumberReplicas replicas = countNodes(lastBlock); + neededReplications.remove(lastBlock, replicas.liveReplicas(), + replicas.decommissionedAndDecommissioning(), getReplication(lastBlock)); + pendingReplications.remove(lastBlock); // remove this block from the list of pending blocks to be deleted. for (DatanodeStorageInfo storage : targets) { - invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock); + invalidateBlocks.remove(storage.getDatanodeDescriptor(), lastBlock); } // Adjust safe-mode totals, since under-construction blocks don't @@ -767,9 +764,11 @@ public class BlockManager implements BlockStatsMXBean { // always decrement total blocks -1); - final long fileLength = bc.computeContentSummary(getStoragePolicySuite()).getLength(); - final long pos = fileLength - ucBlock.getNumBytes(); - return createLocatedBlock(ucBlock, pos, BlockTokenIdentifier.AccessMode.WRITE); + final long fileLength = bc.computeContentSummary( + getStoragePolicySuite()).getLength(); + final long pos = fileLength - lastBlock.getNumBytes(); + return createLocatedBlock(lastBlock, pos, + BlockTokenIdentifier.AccessMode.WRITE); } /** @@ -846,15 +845,9 @@ public class BlockManager implements BlockStatsMXBean { /** @return a LocatedBlock for the given block */ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos ) throws IOException { - if (blk instanceof BlockInfoContiguousUnderConstruction) { - if (blk.isComplete()) { - throw new IOException( - "blk instanceof BlockInfoUnderConstruction && blk.isComplete()" - + ", blk=" + blk); - } - final BlockInfoContiguousUnderConstruction uc = - (BlockInfoContiguousUnderConstruction) blk; - final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); + if (!blk.isComplete()) { + final DatanodeStorageInfo[] storages = blk.getUnderConstructionFeature() + .getExpectedStorageLocations(); final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); return newLocatedBlock(eb, storages, pos, false); } @@ -1761,12 +1754,13 @@ public class BlockManager implements BlockStatsMXBean { * reported by the datanode in the block report. */ static class StatefulBlockInfo { - final BlockInfoContiguousUnderConstruction storedBlock; + final BlockInfo storedBlock; final Block reportedBlock; final ReplicaState reportedState; - StatefulBlockInfo(BlockInfoContiguousUnderConstruction storedBlock, + StatefulBlockInfo(BlockInfo storedBlock, Block reportedBlock, ReplicaState reportedState) { + Preconditions.checkArgument(!storedBlock.isComplete()); this.storedBlock = storedBlock; this.reportedBlock = reportedBlock; this.reportedState = reportedState; @@ -2165,15 +2159,14 @@ public class BlockManager implements BlockStatsMXBean { // If block is under construction, add this replica to its list if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { - ((BlockInfoContiguousUnderConstruction)storedBlock) + storedBlock.getUnderConstructionFeature() .addReplicaIfNotPresent(storageInfo, iblk, reportedState); // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode // refer HDFS-5283 - BlockInfoContiguousUnderConstruction blockUC = - (BlockInfoContiguousUnderConstruction) storedBlock; - if (namesystem.isInSnapshot(blockUC)) { - int numOfReplicas = blockUC.getNumExpectedLocations(); + if (namesystem.isInSnapshot(storedBlock)) { + int numOfReplicas = storedBlock.getUnderConstructionFeature() + .getNumExpectedLocations(); namesystem.incrementSafeBlockCount(numOfReplicas); } //and fall through to next clause @@ -2298,11 +2291,6 @@ public class BlockManager implements BlockStatsMXBean { // Ignore replicas already scheduled to be removed from the DN if(invalidateBlocks.contains(dn, block)) { - /* - * TODO: following assertion is incorrect, see HDFS-2668 assert - * storedBlock.findDatanode(dn) < 0 : "Block " + block + - * " in recentInvalidatesSet should not appear in DN " + dn; - */ return storedBlock; } @@ -2325,8 +2313,7 @@ public class BlockManager implements BlockStatsMXBean { } if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { - toUC.add(new StatefulBlockInfo( - (BlockInfoContiguousUnderConstruction) storedBlock, + toUC.add(new StatefulBlockInfo(storedBlock, new Block(block), reportedState)); return storedBlock; } @@ -2517,8 +2504,8 @@ public class BlockManager implements BlockStatsMXBean { void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, DatanodeStorageInfo storageInfo) throws IOException { - BlockInfoContiguousUnderConstruction block = ucBlock.storedBlock; - block.addReplicaIfNotPresent( + BlockInfo block = ucBlock.storedBlock; + block.getUnderConstructionFeature().addReplicaIfNotPresent( storageInfo, ucBlock.reportedBlock, ucBlock.reportedState); if (ucBlock.reportedState == ReplicaState.FINALIZED && @@ -2578,7 +2565,7 @@ public class BlockManager implements BlockStatsMXBean { assert block != null && namesystem.hasWriteLock(); BlockInfo storedBlock; DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); - if (block instanceof BlockInfoContiguousUnderConstruction) { + if (!block.isComplete()) { //refresh our copy in case the block got completed in another thread storedBlock = blocksMap.getStoredBlock(block); } else { @@ -3533,11 +3520,9 @@ public class BlockManager implements BlockStatsMXBean { String src, BlockInfo[] blocks) { for (BlockInfo b: blocks) { if (!b.isComplete()) { - final BlockInfoContiguousUnderConstruction uc = - (BlockInfoContiguousUnderConstruction)b; final int numNodes = b.numNodes(); LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " - + uc.getBlockUCState() + ", replication# = " + numNodes + + b.getBlockUCState() + ", replication# = " + numNodes + (numNodes < minReplication ? " < ": " >= ") + " minimum = " + minReplication + ") in file " + src); return false; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java new file mode 100644 index 0000000..a555cd6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockUnderConstructionFeature.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.namenode.NameNode; + +/** + * Represents a block that is currently being constructed.<br> + * This is usually the last block of a file opened for write or append. + */ +public class BlockUnderConstructionFeature { + /** Block state. See {@link BlockUCState} */ + private BlockUCState blockUCState; + + /** + * Block replicas as assigned when the block was allocated. + * This defines the pipeline order. + */ + private List<ReplicaUnderConstruction> replicas; + + /** + * Index of the primary data node doing the recovery. Useful for log + * messages. + */ + private int primaryNodeIndex = -1; + + /** + * The new generation stamp, which this block will have + * after the recovery succeeds. Also used as a recovery id to identify + * the right recovery if any of the abandoned recoveries re-appear. + */ + private long blockRecoveryId = 0; + + /** + * The block source to use in the event of copy-on-write truncate. + */ + private Block truncateBlock; + + /** + * ReplicaUnderConstruction contains information about replicas while + * they are under construction. + * The GS, the length and the state of the replica is as reported by + * the data-node. + * It is not guaranteed, but expected, that data-nodes actually have + * corresponding replicas. + */ + static class ReplicaUnderConstruction { + private long generationStamp; + private final DatanodeStorageInfo expectedLocation; + private ReplicaState state; + private boolean chosenAsPrimary; + + ReplicaUnderConstruction(long generationStamp, DatanodeStorageInfo target, + ReplicaState state) { + this.generationStamp = generationStamp; + this.expectedLocation = target; + this.state = state; + this.chosenAsPrimary = false; + } + + long getGenerationStamp() { + return this.generationStamp; + } + + void setGenerationStamp(long generationStamp) { + this.generationStamp = generationStamp; + } + + /** + * Expected block replica location as assigned when the block was allocated. + * This defines the pipeline order. + * It is not guaranteed, but expected, that the data-node actually has + * the replica. + */ + DatanodeStorageInfo getExpectedStorageLocation() { + return expectedLocation; + } + + /** + * Get replica state as reported by the data-node. + */ + ReplicaState getState() { + return state; + } + + /** + * Whether the replica was chosen for recovery. + */ + boolean getChosenAsPrimary() { + return chosenAsPrimary; + } + + /** + * Set replica state. + */ + void setState(ReplicaState s) { + state = s; + } + + /** + * Set whether this replica was chosen for recovery. + */ + void setChosenAsPrimary(boolean chosenAsPrimary) { + this.chosenAsPrimary = chosenAsPrimary; + } + + /** + * Is data-node the replica belongs to alive. + */ + boolean isAlive() { + return expectedLocation.getDatanodeDescriptor().isAlive; + } + + @Override + public String toString() { + final StringBuilder b = new StringBuilder(50) + .append("ReplicaUC[") + .append(expectedLocation) + .append("|") + .append(state) + .append("]"); + return b.toString(); + } + } + + /** + * Create a block that is currently being constructed. + */ + public BlockUnderConstructionFeature(Block block, BlockUCState state, + DatanodeStorageInfo[] targets) { + assert getBlockUCState() != BlockUCState.COMPLETE : + "BlockInfoUnderConstruction cannot be in COMPLETE state"; + this.blockUCState = state; + setExpectedLocations(block.getGenerationStamp(), targets); + } + + /** Set expected locations */ + public void setExpectedLocations(long generationStamp, + DatanodeStorageInfo[] targets) { + int numLocations = targets == null ? 0 : targets.length; + this.replicas = new ArrayList<>(numLocations); + for(int i = 0; i < numLocations; i++) { + replicas.add(new ReplicaUnderConstruction(generationStamp, targets[i], + ReplicaState.RBW)); + } + } + + /** + * Create array of expected replica locations + * (as has been assigned by chooseTargets()). + */ + public DatanodeStorageInfo[] getExpectedStorageLocations() { + int numLocations = replicas == null ? 0 : replicas.size(); + DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; + for (int i = 0; i < numLocations; i++) { + storages[i] = replicas.get(i).getExpectedStorageLocation(); + } + return storages; + } + + /** Get the number of expected locations */ + public int getNumExpectedLocations() { + return replicas == null ? 0 : replicas.size(); + } + + /** + * Return the state of the block under construction. + * @see BlockUCState + */ + public BlockUCState getBlockUCState() { + return blockUCState; + } + + void setBlockUCState(BlockUCState s) { + blockUCState = s; + } + + /** Get block recovery ID */ + public long getBlockRecoveryId() { + return blockRecoveryId; + } + + /** Get recover block */ + public Block getTruncateBlock() { + return truncateBlock; + } + + public void setTruncateBlock(Block recoveryBlock) { + this.truncateBlock = recoveryBlock; + } + + /** + * Set {@link #blockUCState} to {@link BlockUCState#COMMITTED}. + */ + void commit() { + blockUCState = BlockUCState.COMMITTED; + } + + List<ReplicaUnderConstruction> getStaleReplicas(long genStamp) { + List<ReplicaUnderConstruction> staleReplicas = new ArrayList<>(); + if (replicas != null) { + // Remove replicas with wrong gen stamp. The replica list is unchanged. + for (ReplicaUnderConstruction r : replicas) { + if (genStamp != r.getGenerationStamp()) { + staleReplicas.add(r); + } + } + } + return staleReplicas; + } + + /** + * Initialize lease recovery for this block. + * Find the first alive data-node starting from the previous primary and + * make it primary. + */ + public void initializeBlockRecovery(BlockInfo block, long recoveryId) { + setBlockUCState(BlockUCState.UNDER_RECOVERY); + blockRecoveryId = recoveryId; + if (replicas.size() == 0) { + NameNode.blockStateChangeLog.warn("BLOCK*" + + " BlockInfoUnderConstruction.initLeaseRecovery:" + + " No blocks found, lease removed."); + } + boolean allLiveReplicasTriedAsPrimary = true; + for (ReplicaUnderConstruction replica : replicas) { + // Check if all replicas have been tried or not. + if (replica.isAlive()) { + allLiveReplicasTriedAsPrimary = allLiveReplicasTriedAsPrimary + && replica.getChosenAsPrimary(); + } + } + if (allLiveReplicasTriedAsPrimary) { + // Just set all the replicas to be chosen whether they are alive or not. + for (ReplicaUnderConstruction replica : replicas) { + replica.setChosenAsPrimary(false); + } + } + long mostRecentLastUpdate = 0; + ReplicaUnderConstruction primary = null; + primaryNodeIndex = -1; + for(int i = 0; i < replicas.size(); i++) { + // Skip alive replicas which have been chosen for recovery. + if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) { + continue; + } + final ReplicaUnderConstruction ruc = replicas.get(i); + final long lastUpdate = ruc.getExpectedStorageLocation() + .getDatanodeDescriptor().getLastUpdateMonotonic(); + if (lastUpdate > mostRecentLastUpdate) { + primaryNodeIndex = i; + primary = ruc; + mostRecentLastUpdate = lastUpdate; + } + } + if (primary != null) { + primary.getExpectedStorageLocation().getDatanodeDescriptor() + .addBlockToBeRecovered(block); + primary.setChosenAsPrimary(true); + NameNode.blockStateChangeLog.debug( + "BLOCK* {} recovery started, primary={}", this, primary); + } + } + + void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block block, + ReplicaState rState) { + Iterator<ReplicaUnderConstruction> it = replicas.iterator(); + while (it.hasNext()) { + ReplicaUnderConstruction r = it.next(); + DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation(); + if (expectedLocation == storage) { + // Record the gen stamp from the report + r.setGenerationStamp(block.getGenerationStamp()); + return; + } else if (expectedLocation != null && + expectedLocation.getDatanodeDescriptor() == + storage.getDatanodeDescriptor()) { + // The Datanode reported that the block is on a different storage + // than the one chosen by BlockPlacementPolicy. This can occur as + // we allow Datanodes to choose the target storage. Update our + // state by removing the stale entry and adding a new one. + it.remove(); + break; + } + } + replicas.add(new ReplicaUnderConstruction(block.getGenerationStamp(), storage, rState)); + } + + @Override + public String toString() { + final StringBuilder b = new StringBuilder(100); + appendUCParts(b); + return b.toString(); + } + + private void appendUCParts(StringBuilder sb) { + sb.append("{UCState=").append(blockUCState) + .append(", truncateBlock=").append(truncateBlock) + .append(", primaryNodeIndex=").append(primaryNodeIndex) + .append(", replicas=["); + if (replicas != null) { + Iterator<ReplicaUnderConstruction> iter = replicas.iterator(); + if (iter.hasNext()) { + sb.append(iter.next()); + while (iter.hasNext()) { + sb.append(", "); + sb.append(iter.next()); + } + } + } + sb.append("]}"); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 87ce753..9334b5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -222,8 +222,7 @@ public class DatanodeDescriptor extends DatanodeInfo { private final BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<>(); /** A queue of blocks to be recovered by this datanode */ - private final BlockQueue<BlockInfoContiguousUnderConstruction> recoverBlocks = - new BlockQueue<BlockInfoContiguousUnderConstruction>(); + private final BlockQueue<BlockInfo> recoverBlocks = new BlockQueue<>(); /** A set of blocks to be invalidated by this datanode */ private final LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<>(); @@ -602,7 +601,7 @@ public class DatanodeDescriptor extends DatanodeInfo { /** * Store block recovery work. */ - void addBlockToBeRecovered(BlockInfoContiguousUnderConstruction block) { + void addBlockToBeRecovered(BlockInfo block) { if(recoverBlocks.contains(block)) { // this prevents adding the same block twice to the recovery queue BlockManager.LOG.info(block + " is already in the recovery queue"); @@ -644,11 +643,11 @@ public class DatanodeDescriptor extends DatanodeInfo { return replicateBlocks.poll(maxTransfers); } - public BlockInfoContiguousUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) { - List<BlockInfoContiguousUnderConstruction> blocks = recoverBlocks.poll(maxTransfers); + public BlockInfo[] getLeaseRecoveryCommand(int maxTransfers) { + List<BlockInfo> blocks = recoverBlocks.poll(maxTransfers); if(blocks == null) return null; - return blocks.toArray(new BlockInfoContiguousUnderConstruction[blocks.size()]); + return blocks.toArray(new BlockInfo[blocks.size()]); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 3397bbb..d1900f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1380,13 +1380,14 @@ public class DatanodeManager { } //check lease recovery - BlockInfoContiguousUnderConstruction[] blocks = nodeinfo - .getLeaseRecoveryCommand(Integer.MAX_VALUE); + BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE); if (blocks != null) { BlockRecoveryCommand brCommand = new BlockRecoveryCommand( blocks.length); - for (BlockInfoContiguousUnderConstruction b : blocks) { - final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations(); + for (BlockInfo b : blocks) { + BlockUnderConstructionFeature uc = b.getUnderConstructionFeature(); + assert uc != null; + final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); // Skip stale nodes during recovery - not heart beated for some time (30s by default). final List<DatanodeStorageInfo> recoveryLocations = new ArrayList<>(storages.length); @@ -1397,11 +1398,11 @@ public class DatanodeManager { } // If we are performing a truncate recovery than set recovery fields // to old block. - boolean truncateRecovery = b.getTruncateBlock() != null; + boolean truncateRecovery = uc.getTruncateBlock() != null; boolean copyOnTruncateRecovery = truncateRecovery && - b.getTruncateBlock().getBlockId() != b.getBlockId(); + uc.getTruncateBlock().getBlockId() != b.getBlockId(); ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ? - new ExtendedBlock(blockPoolId, b.getTruncateBlock()) : + new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) : new ExtendedBlock(blockPoolId, b); // If we only get 1 replica after eliminating stale nodes, then choose all // replicas for recovery and let the primary data node handle failures. @@ -1420,12 +1421,12 @@ public class DatanodeManager { } if(truncateRecovery) { Block recoveryBlock = (copyOnTruncateRecovery) ? b : - b.getTruncateBlock(); + uc.getTruncateBlock(); brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos, recoveryBlock)); } else { brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos, - b.getBlockRecoveryId())); + uc.getBlockRecoveryId())); } } return new DatanodeCommand[] { brCommand }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java index 474c257..6d37530 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirTruncateOp.java @@ -28,8 +28,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; @@ -95,7 +96,7 @@ final class FSDirTruncateOp { final BlockInfo last = file.getLastBlock(); if (last != null && last.getBlockUCState() == BlockUCState.UNDER_RECOVERY) { - final Block truncatedBlock = ((BlockInfoContiguousUnderConstruction) last) + final Block truncatedBlock = last.getUnderConstructionFeature() .getTruncateBlock(); if (truncatedBlock != null) { final long truncateLength = file.computeFileSize(false, false) @@ -222,42 +223,42 @@ final class FSDirTruncateOp { oldBlock))); } - BlockInfoContiguousUnderConstruction truncatedBlockUC; + final BlockInfo truncatedBlockUC; BlockManager blockManager = fsn.getFSDirectory().getBlockManager(); if (shouldCopyOnTruncate) { // Add new truncateBlock into blocksMap and // use oldBlock as a source for copy-on-truncate recovery - truncatedBlockUC = new BlockInfoContiguousUnderConstruction(newBlock, + truncatedBlockUC = new BlockInfoContiguous(newBlock, file.getPreferredBlockReplication()); + truncatedBlockUC.convertToBlockUnderConstruction( + BlockUCState.UNDER_CONSTRUCTION, blockManager.getStorages(oldBlock)); truncatedBlockUC.setNumBytes(oldBlock.getNumBytes() - lastBlockDelta); - truncatedBlockUC.setTruncateBlock(oldBlock); - file.setLastBlock(truncatedBlockUC, blockManager.getStorages(oldBlock)); + truncatedBlockUC.getUnderConstructionFeature().setTruncateBlock(oldBlock); + file.setLastBlock(truncatedBlockUC); blockManager.addBlockCollection(truncatedBlockUC, file); NameNode.stateChangeLog.debug( "BLOCK* prepareFileForTruncate: Scheduling copy-on-truncate to new" + " size {} new block {} old block {}", - truncatedBlockUC.getNumBytes(), newBlock, - truncatedBlockUC.getTruncateBlock()); + truncatedBlockUC.getNumBytes(), newBlock, oldBlock); } else { // Use new generation stamp for in-place truncate recovery blockManager.convertLastBlockToUnderConstruction(file, lastBlockDelta); oldBlock = file.getLastBlock(); assert !oldBlock.isComplete() : "oldBlock should be under construction"; - truncatedBlockUC = (BlockInfoContiguousUnderConstruction) oldBlock; - truncatedBlockUC.setTruncateBlock(new Block(oldBlock)); - truncatedBlockUC.getTruncateBlock().setNumBytes( - oldBlock.getNumBytes() - lastBlockDelta); - truncatedBlockUC.getTruncateBlock().setGenerationStamp( - newBlock.getGenerationStamp()); + BlockUnderConstructionFeature uc = oldBlock.getUnderConstructionFeature(); + uc.setTruncateBlock(new Block(oldBlock)); + uc.getTruncateBlock().setNumBytes(oldBlock.getNumBytes() - lastBlockDelta); + uc.getTruncateBlock().setGenerationStamp(newBlock.getGenerationStamp()); + truncatedBlockUC = oldBlock; - NameNode.stateChangeLog.debug( - "BLOCK* prepareFileForTruncate: {} Scheduling in-place block " - + "truncate to new size {}", truncatedBlockUC.getTruncateBlock() - .getNumBytes(), truncatedBlockUC); + NameNode.stateChangeLog.debug("BLOCK* prepareFileForTruncate: " + + "{} Scheduling in-place block truncate to new size {}", + uc, uc.getTruncateBlock().getNumBytes()); } if (shouldRecoverNow) { - truncatedBlockUC.initializeBlockRecovery(newBlock.getGenerationStamp()); + truncatedBlockUC.getUnderConstructionFeature().initializeBlockRecovery( + truncatedBlockUC, newBlock.getGenerationStamp()); } return newBlock; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 3d30a19..f04bec2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -43,8 +43,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; @@ -73,7 +74,7 @@ class FSDirWriteFileOp { Block block) throws IOException { // modify file-> block and blocksMap // fileNode should be under construction - BlockInfoContiguousUnderConstruction uc = fileNode.removeLastBlock(block); + BlockInfo uc = fileNode.removeLastBlock(block); if (uc == null) { return false; } @@ -236,8 +237,7 @@ class FSDirWriteFileOp { } else { // add new chosen targets to already allocated block and return BlockInfo lastBlockInFile = pendingFile.getLastBlock(); - ((BlockInfoContiguousUnderConstruction) lastBlockInFile) - .setExpectedLocations(targets); + lastBlockInFile.getUnderConstructionFeature().setExpectedLocations(lastBlockInFile.getGenerationStamp(), targets); offset = pendingFile.computeFileSize(); return makeLocatedBlock(fsn, lastBlockInFile, targets, offset); } @@ -520,12 +520,10 @@ class FSDirWriteFileOp { fileINode.getPreferredBlockReplication(), true); // associate new last block for the file - BlockInfoContiguousUnderConstruction blockInfo = - new BlockInfoContiguousUnderConstruction( - block, - fileINode.getFileReplication(), - HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, - targets); + BlockInfo blockInfo = new BlockInfoContiguous(block, + fileINode.getFileReplication()); + blockInfo.convertToBlockUnderConstruction( + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); fsd.getBlockManager().addBlockCollection(blockInfo, fileINode); fileINode.addBlock(blockInfo); @@ -662,10 +660,10 @@ class FSDirWriteFileOp { "allocation of a new block in " + src + ". Returning previously" + " allocated block " + lastBlockInFile); long offset = file.computeFileSize(); - BlockInfoContiguousUnderConstruction lastBlockUC = - (BlockInfoContiguousUnderConstruction) lastBlockInFile; + BlockUnderConstructionFeature uc = + lastBlockInFile.getUnderConstructionFeature(); onRetryBlock[0] = makeLocatedBlock(fsn, lastBlockInFile, - lastBlockUC.getExpectedStorageLocations(), offset); + uc.getExpectedStorageLocations(), offset); return new FileState(file, src, iip); } else { // Case 3 http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 3dd076d..dfe897a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; @@ -950,7 +949,7 @@ public class FSEditLogLoader { if (pBlock != null) { // the penultimate block is not null Preconditions.checkState(oldBlocks != null && oldBlocks.length > 0); // compare pBlock with the last block of oldBlocks - Block oldLastBlock = oldBlocks[oldBlocks.length - 1]; + BlockInfo oldLastBlock = oldBlocks[oldBlocks.length - 1]; if (oldLastBlock.getBlockId() != pBlock.getBlockId() || oldLastBlock.getGenerationStamp() != pBlock.getGenerationStamp()) { throw new IOException( @@ -960,17 +959,18 @@ public class FSEditLogLoader { } oldLastBlock.setNumBytes(pBlock.getNumBytes()); - if (oldLastBlock instanceof BlockInfoContiguousUnderConstruction) { - fsNamesys.getBlockManager().forceCompleteBlock(file, - (BlockInfoContiguousUnderConstruction) oldLastBlock); + if (!oldLastBlock.isComplete()) { + fsNamesys.getBlockManager().forceCompleteBlock(file, oldLastBlock); fsNamesys.getBlockManager().processQueuedMessagesForBlock(pBlock); } } else { // the penultimate block is null Preconditions.checkState(oldBlocks == null || oldBlocks.length == 0); } // add the new block - BlockInfo newBI = new BlockInfoContiguousUnderConstruction( - newBlock, file.getPreferredBlockReplication()); + BlockInfo newBI = new BlockInfoContiguous(newBlock, + file.getPreferredBlockReplication()); + newBI.convertToBlockUnderConstruction( + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null); fsNamesys.getBlockManager().addBlockCollection(newBI, file); file.addBlock(newBI); fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock); @@ -1010,11 +1010,10 @@ public class FSEditLogLoader { oldBlock.getGenerationStamp() != newBlock.getGenerationStamp(); oldBlock.setGenerationStamp(newBlock.getGenerationStamp()); - if (oldBlock instanceof BlockInfoContiguousUnderConstruction && + if (!oldBlock.isComplete() && (!isLastBlock || op.shouldCompleteLastBlock())) { changeMade = true; - fsNamesys.getBlockManager().forceCompleteBlock(file, - (BlockInfoContiguousUnderConstruction) oldBlock); + fsNamesys.getBlockManager().forceCompleteBlock(file, oldBlock); } if (changeMade) { // The state or gen-stamp of the block has changed. So, we may be @@ -1049,8 +1048,10 @@ public class FSEditLogLoader { // TODO: shouldn't this only be true for the last block? // what about an old-version fsync() where fsync isn't called // until several blocks in? - newBI = new BlockInfoContiguousUnderConstruction( - newBlock, file.getPreferredBlockReplication()); + newBI = new BlockInfoContiguous(newBlock, + file.getPreferredBlockReplication()); + newBI.convertToBlockUnderConstruction( + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null); } else { // OP_CLOSE should add finalized blocks. This code path // is only executed when loading edits written by prior http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index 30517d0..92f333a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -54,7 +54,6 @@ import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; @@ -777,8 +776,8 @@ public class FSImageFormat { // convert the last block to BlockUC if (blocks.length > 0) { BlockInfo lastBlk = blocks[blocks.length - 1]; - blocks[blocks.length - 1] = new BlockInfoContiguousUnderConstruction( - lastBlk, replication); + lastBlk.convertToBlockUnderConstruction( + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index e8378e5..25fd99d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -44,8 +44,8 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.LoaderContext; import org.apache.hadoop.hdfs.server.namenode.FSImageFormatProtobuf.SaverContext; import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary; @@ -363,9 +363,8 @@ public final class FSImageFormatPBINode { file.toUnderConstruction(uc.getClientName(), uc.getClientMachine()); if (blocks.length > 0) { BlockInfo lastBlk = file.getLastBlock(); - // replace the last block of file - file.setBlock(file.numBlocks() - 1, new BlockInfoContiguousUnderConstruction( - lastBlk, replication)); + lastBlk.convertToBlockUnderConstruction( + HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, null); } } return file; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java index f71cf0b..96e4ecb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap; @@ -137,8 +136,9 @@ public class FSImageSerialization { // last block is UNDER_CONSTRUCTION if(numBlocks > 0) { blk.readFields(in); - blocks[i] = new BlockInfoContiguousUnderConstruction( - blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null); + blocks[i] = new BlockInfoContiguous(blk, blockReplication); + blocks[i].convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, + null); } PermissionStatus perm = PermissionStatus.read(in); String clientName = readString(in); @@ -180,9 +180,9 @@ public class FSImageSerialization { /** * Serialize a {@link INodeFile} node - * @param node The node to write + * @param file The INodeFile to write * @param out The {@link DataOutputStream} where the fields are written - * @param writeBlock Whether to write block information + * @param writeUnderConstruction Whether to write under construction information */ public static void writeINodeFile(INodeFile file, DataOutput out, boolean writeUnderConstruction) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index d34242c..4c71cd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -142,7 +142,6 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; @@ -199,8 +198,8 @@ import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretMan import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockUnderConstructionFeature; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics; @@ -3059,8 +3058,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, BlockInfo penultimateBlock = pendingFile.getPenultimateBlock(); // If penultimate block doesn't exist then its minReplication is met - boolean penultimateBlockMinReplication = penultimateBlock == null ? true : - blockManager.checkMinReplication(penultimateBlock); + boolean penultimateBlockMinReplication = penultimateBlock == null + || blockManager.checkMinReplication(penultimateBlock); switch(lastBlockState) { case COMPLETE: @@ -3089,24 +3088,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, throw new AlreadyBeingCreatedException(message); case UNDER_CONSTRUCTION: case UNDER_RECOVERY: - final BlockInfoContiguousUnderConstruction uc = (BlockInfoContiguousUnderConstruction)lastBlock; + BlockUnderConstructionFeature uc = lastBlock.getUnderConstructionFeature(); // determine if last block was intended to be truncated Block recoveryBlock = uc.getTruncateBlock(); boolean truncateRecovery = recoveryBlock != null; boolean copyOnTruncate = truncateRecovery && - recoveryBlock.getBlockId() != uc.getBlockId(); + recoveryBlock.getBlockId() != lastBlock.getBlockId(); assert !copyOnTruncate || - recoveryBlock.getBlockId() < uc.getBlockId() && - recoveryBlock.getGenerationStamp() < uc.getGenerationStamp() && - recoveryBlock.getNumBytes() > uc.getNumBytes() : + recoveryBlock.getBlockId() < lastBlock.getBlockId() && + recoveryBlock.getGenerationStamp() < lastBlock.getGenerationStamp() && + recoveryBlock.getNumBytes() > lastBlock.getNumBytes() : "wrong recoveryBlock"; // setup the last block locations from the blockManager if not known if (uc.getNumExpectedLocations() == 0) { - uc.setExpectedLocations(blockManager.getStorages(lastBlock)); + uc.setExpectedLocations(lastBlock.getGenerationStamp(), + blockManager.getStorages(lastBlock)); } - if (uc.getNumExpectedLocations() == 0 && uc.getNumBytes() == 0) { + if (uc.getNumExpectedLocations() == 0 && lastBlock.getNumBytes() == 0) { // There is no datanode reported to this block. // may be client have crashed before writing data to pipeline. // This blocks doesn't need any recovery. @@ -3119,14 +3119,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return true; } // start recovery of the last block for this file - long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc)); + long blockRecoveryId = nextGenerationStamp( + blockIdManager.isLegacyBlock(lastBlock)); lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); if(copyOnTruncate) { - uc.setGenerationStamp(blockRecoveryId); + lastBlock.setGenerationStamp(blockRecoveryId); } else if(truncateRecovery) { recoveryBlock.setGenerationStamp(blockRecoveryId); } - uc.initializeBlockRecovery(blockRecoveryId); + uc.initializeBlockRecovery(lastBlock, blockRecoveryId); leaseManager.renewLease(lease); // Cannot close file right now, since the last block requires recovery. // This may potentially cause infinite loop in lease recovery @@ -3205,7 +3206,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } @Override - public boolean isInSnapshot(BlockInfoContiguousUnderConstruction blockUC) { + public boolean isInSnapshot(BlockInfo blockUC) { assert hasReadLock(); final BlockCollection bc = blockUC.getBlockCollection(); if (bc == null || !(bc instanceof INodeFile) @@ -3252,7 +3253,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, waitForLoadingFSImage(); writeLock(); boolean copyTruncate = false; - BlockInfoContiguousUnderConstruction truncatedBlock = null; + BlockInfo truncatedBlock = null; try { checkOperation(OperationCategory.WRITE); // If a DN tries to commit to the standby, the recovery will @@ -3309,9 +3310,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return; } - truncatedBlock = (BlockInfoContiguousUnderConstruction) iFile - .getLastBlock(); - long recoveryId = truncatedBlock.getBlockRecoveryId(); + truncatedBlock = iFile.getLastBlock(); + long recoveryId = truncatedBlock.getUnderConstructionFeature() + .getBlockRecoveryId(); copyTruncate = truncatedBlock.getBlockId() != storedBlock.getBlockId(); if(recoveryId != newgenerationstamp) { throw new IOException("The recovery id " + newgenerationstamp @@ -3374,9 +3375,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]), trimmedStorages.toArray(new String[trimmedStorages.size()])); if(copyTruncate) { - iFile.setLastBlock(truncatedBlock, trimmedStorageInfos); + iFile.convertLastBlockToUC(truncatedBlock, trimmedStorageInfos); } else { - iFile.setLastBlock(storedBlock, trimmedStorageInfos); + iFile.convertLastBlockToUC(storedBlock, trimmedStorageInfos); if (closeFile) { blockManager.markBlockReplicasAsCorrupt(storedBlock, oldGenerationStamp, oldNumBytes, trimmedStorageInfos); @@ -5351,8 +5352,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, assert hasWriteLock(); // check the vadility of the block and lease holder name final INodeFile pendingFile = checkUCBlock(oldBlock, clientName); - final BlockInfoContiguousUnderConstruction blockinfo - = (BlockInfoContiguousUnderConstruction)pendingFile.getLastBlock(); + final BlockInfo blockinfo = pendingFile.getLastBlock(); + assert !blockinfo.isComplete(); // check new GS & length: this is not expected if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() || @@ -5371,7 +5372,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // find the DatanodeDescriptor objects final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager() .getDatanodeStorageInfos(newNodes, newStorageIDs); - blockinfo.setExpectedLocations(storages); + blockinfo.getUnderConstructionFeature().setExpectedLocations( + blockinfo.getGenerationStamp(), storages); String src = pendingFile.getFullPathName(); FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e535e0f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java index d07ae1f..26ee8f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java @@ -21,7 +21,6 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; /** @@ -61,7 +60,7 @@ public class FileUnderConstructionFeature implements INode.Feature { BlockInfo lastBlock = f.getLastBlock(); assert (lastBlock != null) : "The last block for path " + f.getFullPathName() + " is null when updating its length"; - assert (lastBlock instanceof BlockInfoContiguousUnderConstruction) + assert !lastBlock.isComplete() : "The last block for path " + f.getFullPathName() + " is not a BlockInfoUnderConstruction when updating its length"; lastBlock.setNumBytes(lastBlockLength); @@ -76,9 +75,8 @@ public class FileUnderConstructionFeature implements INode.Feature { final BlocksMapUpdateInfo collectedBlocks) { final BlockInfo[] blocks = f.getBlocks(); if (blocks != null && blocks.length > 0 - && blocks[blocks.length - 1] instanceof BlockInfoContiguousUnderConstruction) { - BlockInfoContiguousUnderConstruction lastUC = - (BlockInfoContiguousUnderConstruction) blocks[blocks.length - 1]; + && !blocks[blocks.length - 1].isComplete()) { + BlockInfo lastUC = blocks[blocks.length - 1]; if (lastUC.getNumBytes() == 0) { // this is a 0-sized block. do not need check its UC state here collectedBlocks.addDeleteBlock(lastUC);
