http://git-wip-us.apache.org/repos/asf/hadoop/blob/3100002d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index da09b0e,29bcd79..c93a362 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@@ -2816,30 -2633,14 +2831,30 @@@ public class DataNode extends Reconfigu } /** - * Convenience method, which unwraps RemoteException. - * @throws IOException not a RemoteException. - */ - * Update replica with the new generation stamp and length. ++ * Convenience method, which unwraps RemoteException. ++ * @throws IOException not a RemoteException. ++ */ + private static ReplicaRecoveryInfo callInitReplicaRecovery( + InterDatanodeProtocol datanode, + RecoveringBlock rBlock) throws IOException { + try { + return datanode.initReplicaRecovery(rBlock); - } catch(RemoteException re) { ++ } catch (RemoteException re) { + throw re.unwrapRemoteException(); + } + } + + /** - * Update replica with the new generation stamp and length. ++ * Update replica with the new generation stamp and length. */ @Override // InterDatanodeProtocol public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock, -- final long recoveryId, final long newBlockId, final long newLength) ++ final long recoveryId, final long newBlockId, final long newLength) throws IOException { - final String storageID = data.updateReplicaUnderRecovery(oldBlock, - recoveryId, newBlockId, newLength); + final FsDatasetSpi<?> dataset = + (FsDatasetSpi<?>) getDataset(oldBlock.getBlockPoolId()); + final String storageID = dataset.updateReplicaUnderRecovery( + oldBlock, recoveryId, newBlockId, newLength); // Notify the namenode of the updated block info. This is important // for HA, since otherwise the standby node may lose track of the // block locations until the next block report. @@@ -2851,234 -2652,6 +2866,244 @@@ return storageID; } - /** A convenient class used in block recovery */ - static class BlockRecord { ++ /** ++ * A convenient class used in block recovery ++ */ ++ static class BlockRecord { + final DatanodeID id; + final InterDatanodeProtocol datanode; + final ReplicaRecoveryInfo rInfo; - + private String storageID; + + BlockRecord(DatanodeID id, + InterDatanodeProtocol datanode, + ReplicaRecoveryInfo rInfo) { + this.id = id; + this.datanode = datanode; + this.rInfo = rInfo; + } + + void updateReplicaUnderRecovery(String bpid, long recoveryId, + long newBlockId, long newLength) + throws IOException { + final ExtendedBlock b = new ExtendedBlock(bpid, rInfo); + storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId, + newLength); + } + + @Override + public String toString() { + return "block:" + rInfo + " node:" + id; + } + } + - /** Recover a block */ ++ ++ /** ++ * Recover a block ++ */ + private void recoverBlock(RecoveringBlock rBlock) throws IOException { + ExtendedBlock block = rBlock.getBlock(); + String blookPoolId = block.getBlockPoolId(); + DatanodeID[] datanodeids = rBlock.getLocations(); + List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length); + int errorCount = 0; + + //check generation stamps - for(DatanodeID id : datanodeids) { ++ for (DatanodeID id : datanodeids) { + try { + BPOfferService bpos = blockPoolManager.get(blookPoolId); + DatanodeRegistration bpReg = bpos.bpRegistration; - InterDatanodeProtocol datanode = bpReg.equals(id)? - this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), - dnConf.socketTimeout, dnConf.connectToDnViaHostname); ++ InterDatanodeProtocol datanode = bpReg.equals(id) ? ++ this : DataNode.createInterDataNodeProtocolProxy(id, getConf(), ++ dnConf.socketTimeout, dnConf.connectToDnViaHostname); + ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock); + if (info != null && + info.getGenerationStamp() >= block.getGenerationStamp() && + info.getNumBytes() > 0) { + syncList.add(new BlockRecord(id, datanode, info)); + } + } catch (RecoveryInProgressException ripE) { + InterDatanodeProtocol.LOG.warn( + "Recovery for replica " + block + " on data-node " + id - + " is already in progress. Recovery id = " - + rBlock.getNewGenerationStamp() + " is aborted.", ripE); ++ + " is already in progress. Recovery id = " ++ + rBlock.getNewGenerationStamp() + " is aborted.", ripE); + return; + } catch (IOException e) { + ++errorCount; + InterDatanodeProtocol.LOG.warn( - "Failed to obtain replica info for block (=" + block - + ") from datanode (=" + id + ")", e); ++ "Failed to obtain replica info for block (=" + block ++ + ") from datanode (=" + id + ")", e); + } + } + + if (errorCount == datanodeids.length) { + throw new IOException("All datanodes failed: block=" + block + + ", datanodeids=" + Arrays.asList(datanodeids)); + } + + syncBlock(rBlock, syncList); + } + + /** + * Get the NameNode corresponding to the given block pool. + * + * @param bpid Block pool Id + * @return Namenode corresponding to the bpid - * @throws IOException if unable to get the corresponding NameNode ++ * @throws IOException if unable to get the corresponding NameNode Block ++ * synchronization + */ + public DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(String bpid) + throws IOException { + BPOfferService bpos = blockPoolManager.get(bpid); + if (bpos == null) { + throw new IOException("No block pool offer service for bpid=" + bpid); + } - ++ + DatanodeProtocolClientSideTranslatorPB activeNN = bpos.getActiveNN(); + if (activeNN == null) { + throw new IOException( + "Block pool " + bpid + " has not recognized an active NN"); + } + return activeNN; + } + - /** Block synchronization */ ++ /** ++ * Block synchronization ++ */ + void syncBlock(RecoveringBlock rBlock, - List<BlockRecord> syncList) throws IOException { ++ List<BlockRecord> syncList) throws IOException { + ExtendedBlock block = rBlock.getBlock(); + final String bpid = block.getBlockPoolId(); + DatanodeProtocolClientSideTranslatorPB nn = - getActiveNamenodeForBP(block.getBlockPoolId()); ++ getActiveNamenodeForBP(block.getBlockPoolId()); + + long recoveryId = rBlock.getNewGenerationStamp(); + boolean isTruncateRecovery = rBlock.getNewBlock() != null; + long blockId = (isTruncateRecovery) ? + rBlock.getNewBlock().getBlockId() : block.getBlockId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("block=" + block + ", (length=" + block.getNumBytes() + + "), syncList=" + syncList); + } + + // syncList.isEmpty() means that all data-nodes do not have the block + // or their replicas have 0 length. + // The block can be deleted. + if (syncList.isEmpty()) { + nn.commitBlockSynchronization(block, recoveryId, 0, + true, true, DatanodeID.EMPTY_ARRAY, null); + return; + } + + // Calculate the best available replica state. + ReplicaState bestState = ReplicaState.RWR; + long finalizedLength = -1; - for(BlockRecord r : syncList) { ++ for (BlockRecord r : syncList) { + assert r.rInfo.getNumBytes() > 0 : "zero length replica"; - ReplicaState rState = r.rInfo.getOriginalReplicaState(); - if(rState.getValue() < bestState.getValue()) ++ ReplicaState rState = r.rInfo.getOriginalReplicaState(); ++ if (rState.getValue() < bestState.getValue()) { + bestState = rState; - if(rState == ReplicaState.FINALIZED) { - if(finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes()) ++ } ++ if (rState == ReplicaState.FINALIZED) { ++ if (finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes()) { + throw new IOException("Inconsistent size of finalized replicas. " + + "Replica " + r.rInfo + " expected size: " + finalizedLength); ++ } + finalizedLength = r.rInfo.getNumBytes(); + } + } + + // Calculate list of nodes that will participate in the recovery + // and the new block size + List<BlockRecord> participatingList = new ArrayList<BlockRecord>(); + final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId, + -1, recoveryId); - switch(bestState) { ++ switch (bestState) { + case FINALIZED: + assert finalizedLength > 0 : "finalizedLength is not positive"; - for(BlockRecord r : syncList) { ++ for (BlockRecord r : syncList) { + ReplicaState rState = r.rInfo.getOriginalReplicaState(); - if(rState == ReplicaState.FINALIZED || - rState == ReplicaState.RBW && - r.rInfo.getNumBytes() == finalizedLength) ++ if (rState == ReplicaState.FINALIZED || ++ rState == ReplicaState.RBW && ++ r.rInfo.getNumBytes() == finalizedLength) { + participatingList.add(r); ++ } + } + newBlock.setNumBytes(finalizedLength); + break; + case RBW: + case RWR: + long minLength = Long.MAX_VALUE; - for(BlockRecord r : syncList) { ++ for (BlockRecord r : syncList) { + ReplicaState rState = r.rInfo.getOriginalReplicaState(); - if(rState == bestState) { ++ if (rState == bestState) { + minLength = Math.min(minLength, r.rInfo.getNumBytes()); + participatingList.add(r); + } + } + newBlock.setNumBytes(minLength); + break; + case RUR: + case TEMPORARY: + assert false : "bad replica state: " + bestState; + } - if(isTruncateRecovery) ++ if (isTruncateRecovery) { + newBlock.setNumBytes(rBlock.getNewBlock().getNumBytes()); ++ } + + List<DatanodeID> failedList = new ArrayList<DatanodeID>(); + final List<BlockRecord> successList = new ArrayList<BlockRecord>(); - for(BlockRecord r : participatingList) { ++ for (BlockRecord r : participatingList) { + try { + r.updateReplicaUnderRecovery(bpid, recoveryId, blockId, + newBlock.getNumBytes()); + successList.add(r); + } catch (IOException e) { + InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock=" + + newBlock + ", datanode=" + r.id + ")", e); + failedList.add(r.id); + } + } + + // If any of the data-nodes failed, the recovery fails, because + // we never know the actual state of the replica on failed data-nodes. + // The recovery should be started over. - if(!failedList.isEmpty()) { ++ if (!failedList.isEmpty()) { + StringBuilder b = new StringBuilder(); - for(DatanodeID id : failedList) { ++ for (DatanodeID id : failedList) { + b.append("\n " + id); + } + throw new IOException("Cannot recover " + block + ", the following " + + failedList.size() + " data-nodes failed {" + b + "\n}"); + } + + // Notify the name-node about successfully recovered replicas. + final DatanodeID[] datanodes = new DatanodeID[successList.size()]; + final String[] storages = new String[datanodes.length]; - for(int i = 0; i < datanodes.length; i++) { ++ for (int i = 0; i < datanodes.length; i++) { + final BlockRecord r = successList.get(i); + datanodes[i] = r.id; + storages[i] = r.storageID; + } + nn.commitBlockSynchronization(block, + newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false, + datanodes, storages); + } - ++ + private static void logRecoverBlock(String who, RecoveringBlock rb) { + ExtendedBlock block = rb.getBlock(); + DatanodeInfo[] targets = rb.getLocations(); - + LOG.info(who + " calls recoverBlock(" + block + + ", targets=[" + Joiner.on(", ").join(targets) + "]" + + ((rb.getNewBlock() == null) ? ", newGenerationStamp=" - + rb.getNewGenerationStamp() : ", newBlock=" + rb.getNewBlock()) ++ + rb.getNewGenerationStamp() : ", newBlock=" + rb.getNewBlock()) + + ")"); + } + + /** + * Only valid for blocks stored by FsDatasetSpi instances. + */ @Override // ClientDataNodeProtocol public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { checkReadAccess(block);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3100002d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3100002d/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3100002d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java ----------------------------------------------------------------------