HDFS-9255. Consolidate block recovery related implementation into a single class. Contributed by Walter Su.
Change-Id: I7a1c03f50123d79ac0a78c981d9721617e3229d1 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e287e7d1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e287e7d1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e287e7d1 Branch: refs/heads/HDFS-8966 Commit: e287e7d14b838a866ba03d895fa35819999d7c09 Parents: a04b169 Author: Zhe Zhang <[email protected]> Authored: Wed Oct 28 07:34:06 2015 -0700 Committer: Zhe Zhang <[email protected]> Committed: Wed Oct 28 07:34:06 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 21 +- .../server/blockmanagement/DatanodeManager.java | 10 +- .../hdfs/server/datanode/BPOfferService.java | 3 +- .../server/datanode/BlockRecoveryWorker.java | 330 +++++++++++++++++++ .../hadoop/hdfs/server/datanode/DataNode.java | 292 +--------------- .../hdfs/server/datanode/TestBlockRecovery.java | 74 +++-- 7 files changed, 420 insertions(+), 313 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e287e7d1/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7f3052f..184b743 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1596,6 +1596,9 @@ Release 2.8.0 - UNRELEASED HDFS-9311. Support optional offload of NameNode HA service health checks to a separate RPC server. (cnauroth) + HDFS-9255. Consolidate block recovery related implementation into a single + class. (Walter Su via zhz) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/e287e7d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 05c498f..fd8a386 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto; @@ -49,13 +50,8 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailur import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaOptionEntryProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ECSchemaProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ErasureCodingPolicyProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockStoragePolicyProto; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfosProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; @@ -367,11 +363,16 @@ public class PBHelper { } public static RecoveringBlock convert(RecoveringBlockProto b) { - ExtendedBlock block = PBHelperClient.convert(b.getBlock().getB()); - DatanodeInfo[] locs = PBHelperClient.convert(b.getBlock().getLocsList()); - return (b.hasTruncateBlock()) ? - new RecoveringBlock(block, locs, PBHelperClient.convert(b.getTruncateBlock())) : - new RecoveringBlock(block, locs, b.getNewGenStamp()); + LocatedBlock lb = PBHelperClient.convertLocatedBlockProto(b.getBlock()); + RecoveringBlock rBlock; + if (b.hasTruncateBlock()) { + rBlock = new RecoveringBlock(lb.getBlock(), lb.getLocations(), + PBHelperClient.convert(b.getTruncateBlock())); + } else { + rBlock = new RecoveringBlock(lb.getBlock(), lb.getLocations(), + b.getNewGenStamp()); + } + return rBlock; } public static ReplicaState convert(ReplicaStateProto state) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e287e7d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index e30bc2a..b32092d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1390,15 +1390,17 @@ public class DatanodeManager { // in block recovery. recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages); } + RecoveringBlock rBlock; if(truncateRecovery) { Block recoveryBlock = (copyOnTruncateRecovery) ? b : uc.getTruncateBlock(); - brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos, - recoveryBlock)); + rBlock = new RecoveringBlock(primaryBlock, recoveryInfos, + recoveryBlock); } else { - brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos, - uc.getBlockRecoveryId())); + rBlock = new RecoveringBlock(primaryBlock, recoveryInfos, + uc.getBlockRecoveryId()); } + brCommand.add(rBlock); } return new DatanodeCommand[] { brCommand }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e287e7d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 96e74e5..7759818 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -700,7 +700,8 @@ class BPOfferService { break; case DatanodeProtocol.DNA_RECOVERBLOCK: String who = "NameNode at " + actor.getNNSocketAddress(); - dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks()); + dn.getBlockRecoveryWorker().recoverBlocks(who, + ((BlockRecoveryCommand)cmd).getRecoveringBlocks()); break; case DatanodeProtocol.DNA_ACCESSKEYUPDATE: LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e287e7d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java new file mode 100644 index 0000000..42fcf48 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockRecoveryWorker.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import com.google.common.base.Joiner; +import org.apache.commons.logging.Log; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.Daemon; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +/** + * This class handles the block recovery work commands. + */ [email protected] +public class BlockRecoveryWorker { + public static final Log LOG = DataNode.LOG; + + private final DataNode datanode; + private final Configuration conf; + private final DNConf dnConf; + + BlockRecoveryWorker(DataNode datanode) { + this.datanode = datanode; + conf = datanode.getConf(); + dnConf = datanode.getDnConf(); + } + + /** A convenient class used in block recovery. */ + static class BlockRecord { + private final DatanodeID id; + private final InterDatanodeProtocol datanode; + private final ReplicaRecoveryInfo rInfo; + + private String storageID; + + BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, + ReplicaRecoveryInfo rInfo) { + this.id = id; + this.datanode = datanode; + this.rInfo = rInfo; + } + + private void updateReplicaUnderRecovery(String bpid, long recoveryId, + long newBlockId, long newLength) throws IOException { + final ExtendedBlock b = new ExtendedBlock(bpid, rInfo); + storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId, + newLength); + } + + @Override + public String toString() { + return "block:" + rInfo + " node:" + id; + } + } + + /** A block recovery task for a contiguous block. */ + class RecoveryTaskContiguous { + private final RecoveringBlock rBlock; + private final ExtendedBlock block; + private final String bpid; + private final DatanodeInfo[] locs; + private final long recoveryId; + + RecoveryTaskContiguous(RecoveringBlock rBlock) { + this.rBlock = rBlock; + block = rBlock.getBlock(); + bpid = block.getBlockPoolId(); + locs = rBlock.getLocations(); + recoveryId = rBlock.getNewGenerationStamp(); + } + + protected void recover() throws IOException { + List<BlockRecord> syncList = new ArrayList<>(locs.length); + int errorCount = 0; + + //check generation stamps + for(DatanodeID id : locs) { + try { + DatanodeID bpReg =datanode.getBPOfferService(bpid).bpRegistration; + InterDatanodeProtocol proxyDN = bpReg.equals(id)? + datanode: DataNode.createInterDataNodeProtocolProxy(id, conf, + dnConf.socketTimeout, dnConf.connectToDnViaHostname); + ReplicaRecoveryInfo info = callInitReplicaRecovery(proxyDN, rBlock); + if (info != null && + info.getGenerationStamp() >= block.getGenerationStamp() && + info.getNumBytes() > 0) { + syncList.add(new BlockRecord(id, proxyDN, info)); + } + } catch (RecoveryInProgressException ripE) { + InterDatanodeProtocol.LOG.warn( + "Recovery for replica " + block + " on data-node " + id + + " is already in progress. Recovery id = " + + rBlock.getNewGenerationStamp() + " is aborted.", ripE); + return; + } catch (IOException e) { + ++errorCount; + InterDatanodeProtocol.LOG.warn( + "Failed to obtain replica info for block (=" + block + + ") from datanode (=" + id + ")", e); + } + } + + if (errorCount == locs.length) { + throw new IOException("All datanodes failed: block=" + block + + ", datanodeids=" + Arrays.asList(locs)); + } + + syncBlock(syncList); + } + + /** Block synchronization. */ + void syncBlock(List<BlockRecord> syncList) throws IOException { + DatanodeProtocolClientSideTranslatorPB nn = + getActiveNamenodeForBP(block.getBlockPoolId()); + + boolean isTruncateRecovery = rBlock.getNewBlock() != null; + long blockId = (isTruncateRecovery) ? + rBlock.getNewBlock().getBlockId() : block.getBlockId(); + + if (LOG.isDebugEnabled()) { + LOG.debug("block=" + block + ", (length=" + block.getNumBytes() + + "), syncList=" + syncList); + } + + // syncList.isEmpty() means that all data-nodes do not have the block + // or their replicas have 0 length. + // The block can be deleted. + if (syncList.isEmpty()) { + nn.commitBlockSynchronization(block, recoveryId, 0, + true, true, DatanodeID.EMPTY_ARRAY, null); + return; + } + + // Calculate the best available replica state. + ReplicaState bestState = ReplicaState.RWR; + long finalizedLength = -1; + for (BlockRecord r : syncList) { + assert r.rInfo.getNumBytes() > 0 : "zero length replica"; + ReplicaState rState = r.rInfo.getOriginalReplicaState(); + if (rState.getValue() < bestState.getValue()) { + bestState = rState; + } + if(rState == ReplicaState.FINALIZED) { + if (finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes()) { + throw new IOException("Inconsistent size of finalized replicas. " + + "Replica " + r.rInfo + " expected size: " + finalizedLength); + } + finalizedLength = r.rInfo.getNumBytes(); + } + } + + // Calculate list of nodes that will participate in the recovery + // and the new block size + List<BlockRecord> participatingList = new ArrayList<>(); + final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId, + -1, recoveryId); + switch(bestState) { + case FINALIZED: + assert finalizedLength > 0 : "finalizedLength is not positive"; + for(BlockRecord r : syncList) { + ReplicaState rState = r.rInfo.getOriginalReplicaState(); + if (rState == ReplicaState.FINALIZED || + rState == ReplicaState.RBW && + r.rInfo.getNumBytes() == finalizedLength) { + participatingList.add(r); + } + } + newBlock.setNumBytes(finalizedLength); + break; + case RBW: + case RWR: + long minLength = Long.MAX_VALUE; + for(BlockRecord r : syncList) { + ReplicaState rState = r.rInfo.getOriginalReplicaState(); + if(rState == bestState) { + minLength = Math.min(minLength, r.rInfo.getNumBytes()); + participatingList.add(r); + } + } + newBlock.setNumBytes(minLength); + break; + case RUR: + case TEMPORARY: + assert false : "bad replica state: " + bestState; + default: + break; // we have 'case' all enum values + } + if (isTruncateRecovery) { + newBlock.setNumBytes(rBlock.getNewBlock().getNumBytes()); + } + + List<DatanodeID> failedList = new ArrayList<>(); + final List<BlockRecord> successList = new ArrayList<>(); + for (BlockRecord r : participatingList) { + try { + r.updateReplicaUnderRecovery(bpid, recoveryId, blockId, + newBlock.getNumBytes()); + successList.add(r); + } catch (IOException e) { + InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock=" + + newBlock + ", datanode=" + r.id + ")", e); + failedList.add(r.id); + } + } + + // If any of the data-nodes failed, the recovery fails, because + // we never know the actual state of the replica on failed data-nodes. + // The recovery should be started over. + if (!failedList.isEmpty()) { + StringBuilder b = new StringBuilder(); + for(DatanodeID id : failedList) { + b.append("\n " + id); + } + throw new IOException("Cannot recover " + block + ", the following " + + failedList.size() + " data-nodes failed {" + b + "\n}"); + } + + // Notify the name-node about successfully recovered replicas. + final DatanodeID[] datanodes = new DatanodeID[successList.size()]; + final String[] storages = new String[datanodes.length]; + for (int i = 0; i < datanodes.length; i++) { + final BlockRecord r = successList.get(i); + datanodes[i] = r.id; + storages[i] = r.storageID; + } + nn.commitBlockSynchronization(block, + newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false, + datanodes, storages); + } + } + + private static void logRecoverBlock(String who, RecoveringBlock rb) { + ExtendedBlock block = rb.getBlock(); + DatanodeInfo[] targets = rb.getLocations(); + + LOG.info(who + " calls recoverBlock(" + block + + ", targets=[" + Joiner.on(", ").join(targets) + "]" + + ", newGenerationStamp=" + rb.getNewGenerationStamp() + + ", newBlock=" + rb.getNewBlock() + + ", isStriped=" + rb.isStriped() + + ")"); + } + + /** + * Convenience method, which unwraps RemoteException. + * @throws IOException not a RemoteException. + */ + private static ReplicaRecoveryInfo callInitReplicaRecovery( + InterDatanodeProtocol datanode, RecoveringBlock rBlock) + throws IOException { + try { + return datanode.initReplicaRecovery(rBlock); + } catch(RemoteException re) { + throw re.unwrapRemoteException(); + } + } + + /** + * Get the NameNode corresponding to the given block pool. + * + * @param bpid Block pool Id + * @return Namenode corresponding to the bpid + * @throws IOException if unable to get the corresponding NameNode + */ + DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP( + String bpid) throws IOException { + BPOfferService bpos = datanode.getBPOfferService(bpid); + if (bpos == null) { + throw new IOException("No block pool offer service for bpid=" + bpid); + } + + DatanodeProtocolClientSideTranslatorPB activeNN = bpos.getActiveNN(); + if (activeNN == null) { + throw new IOException( + "Block pool " + bpid + " has not recognized an active NN"); + } + return activeNN; + } + + public Daemon recoverBlocks(final String who, + final Collection<RecoveringBlock> blocks) { + Daemon d = new Daemon(datanode.threadGroup, new Runnable() { + @Override + public void run() { + for(RecoveringBlock b : blocks) { + try { + logRecoverBlock(who, b); + RecoveryTaskContiguous task = new RecoveryTaskContiguous(b); + task.recover(); + } catch (IOException e) { + LOG.warn("recoverBlocks FAILED: " + b, e); + } + } + } + }); + d.start(); + return d; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e287e7d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c4c5bbb..29bcd79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -123,7 +123,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; @@ -174,7 +173,6 @@ import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.DNS; @@ -368,6 +366,7 @@ public class DataNode extends ReconfigurableBase private String supergroup; private boolean isPermissionEnabled; private String dnUserName = null; + private BlockRecoveryWorker blockRecoveryWorker; private ErasureCodingWorker ecWorker; private final Tracer tracer; private final TracerConfigurationManager tracerConfigurationManager; @@ -706,7 +705,7 @@ public class DataNode extends ReconfigurableBase /** * Remove volumes from DataNode. - * See {@link removeVolumes(final Set<File>, boolean)} for details. + * See {@link #removeVolumes(Set, boolean)} for details. * * @param locations the StorageLocations of the volumes to be removed. * @throws IOException @@ -730,7 +729,7 @@ public class DataNode extends ReconfigurableBase * <li> * <ul>Remove volumes and block info from FsDataset.</ul> * <ul>Remove volumes from DataStorage.</ul> - * <ul>Reset configuration DATA_DIR and {@link dataDirs} to represent + * <ul>Reset configuration DATA_DIR and {@link #dataDirs} to represent * active volumes.</ul> * </li> * @param absoluteVolumePaths the absolute path of volumes. @@ -856,7 +855,6 @@ public class DataNode extends ReconfigurableBase } } } - private void initIpcServer(Configuration conf) throws IOException { InetSocketAddress ipcAddr = NetUtils.createSocketAddr( @@ -1104,8 +1102,6 @@ public class DataNode extends ReconfigurableBase bpos.trySendErrorReport(errCode, errMsg); } - - /** * Return the BPOfferService instance corresponding to the given block. * @return the BPOS @@ -1122,8 +1118,6 @@ public class DataNode extends ReconfigurableBase return bpos; } - - // used only for testing @VisibleForTesting void setHeartbeatsDisabledForTests( @@ -1215,7 +1209,10 @@ public class DataNode extends ReconfigurableBase metrics = DataNodeMetrics.create(conf, getDisplayName()); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); - + + ecWorker = new ErasureCodingWorker(conf, this); + blockRecoveryWorker = new BlockRecoveryWorker(this); + blockPoolManager = new BlockPoolManager(this); blockPoolManager.refreshNamenodes(conf); @@ -1225,8 +1222,6 @@ public class DataNode extends ReconfigurableBase saslClient = new SaslDataTransferClient(dnConf.conf, dnConf.saslPropsResolver, dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); - // Initialize ErasureCoding worker - ecWorker = new ErasureCodingWorker(conf, this); startMetricsLogger(conf); } @@ -1450,6 +1445,10 @@ public class DataNode extends ReconfigurableBase List<BPOfferService> getAllBpOs() { return blockPoolManager.getAllNamenodeThreads(); } + + BPOfferService getBPOfferService(String bpid){ + return blockPoolManager.get(bpid); + } int getBpOsCount() { return blockPoolManager.getAllNamenodeThreads().size(); @@ -2626,50 +2625,14 @@ public class DataNode extends ReconfigurableBase secureMain(args, null); } - public Daemon recoverBlocks( - final String who, - final Collection<RecoveringBlock> blocks) { - - Daemon d = new Daemon(threadGroup, new Runnable() { - /** Recover a list of blocks. It is run by the primary datanode. */ - @Override - public void run() { - for(RecoveringBlock b : blocks) { - try { - logRecoverBlock(who, b); - recoverBlock(b); - } catch (IOException e) { - LOG.warn("recoverBlocks FAILED: " + b, e); - } - } - } - }); - d.start(); - return d; - } - // InterDataNodeProtocol implementation @Override // InterDatanodeProtocol public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) - throws IOException { + throws IOException { return data.initReplicaRecovery(rBlock); } /** - * Convenience method, which unwraps RemoteException. - * @throws IOException not a RemoteException. - */ - private static ReplicaRecoveryInfo callInitReplicaRecovery( - InterDatanodeProtocol datanode, - RecoveringBlock rBlock) throws IOException { - try { - return datanode.initReplicaRecovery(rBlock); - } catch(RemoteException re) { - throw re.unwrapRemoteException(); - } - } - - /** * Update replica with the new generation stamp and length. */ @Override // InterDatanodeProtocol @@ -2689,231 +2652,6 @@ public class DataNode extends ReconfigurableBase return storageID; } - /** A convenient class used in block recovery */ - static class BlockRecord { - final DatanodeID id; - final InterDatanodeProtocol datanode; - final ReplicaRecoveryInfo rInfo; - - private String storageID; - - BlockRecord(DatanodeID id, - InterDatanodeProtocol datanode, - ReplicaRecoveryInfo rInfo) { - this.id = id; - this.datanode = datanode; - this.rInfo = rInfo; - } - - void updateReplicaUnderRecovery(String bpid, long recoveryId, - long newBlockId, long newLength) - throws IOException { - final ExtendedBlock b = new ExtendedBlock(bpid, rInfo); - storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newBlockId, - newLength); - } - - @Override - public String toString() { - return "block:" + rInfo + " node:" + id; - } - } - - /** Recover a block */ - private void recoverBlock(RecoveringBlock rBlock) throws IOException { - ExtendedBlock block = rBlock.getBlock(); - String blookPoolId = block.getBlockPoolId(); - DatanodeID[] datanodeids = rBlock.getLocations(); - List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length); - int errorCount = 0; - - //check generation stamps - for(DatanodeID id : datanodeids) { - try { - BPOfferService bpos = blockPoolManager.get(blookPoolId); - DatanodeRegistration bpReg = bpos.bpRegistration; - InterDatanodeProtocol datanode = bpReg.equals(id)? - this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), - dnConf.socketTimeout, dnConf.connectToDnViaHostname); - ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock); - if (info != null && - info.getGenerationStamp() >= block.getGenerationStamp() && - info.getNumBytes() > 0) { - syncList.add(new BlockRecord(id, datanode, info)); - } - } catch (RecoveryInProgressException ripE) { - InterDatanodeProtocol.LOG.warn( - "Recovery for replica " + block + " on data-node " + id - + " is already in progress. Recovery id = " - + rBlock.getNewGenerationStamp() + " is aborted.", ripE); - return; - } catch (IOException e) { - ++errorCount; - InterDatanodeProtocol.LOG.warn( - "Failed to obtain replica info for block (=" + block - + ") from datanode (=" + id + ")", e); - } - } - - if (errorCount == datanodeids.length) { - throw new IOException("All datanodes failed: block=" + block - + ", datanodeids=" + Arrays.asList(datanodeids)); - } - - syncBlock(rBlock, syncList); - } - - /** - * Get the NameNode corresponding to the given block pool. - * - * @param bpid Block pool Id - * @return Namenode corresponding to the bpid - * @throws IOException if unable to get the corresponding NameNode - */ - public DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(String bpid) - throws IOException { - BPOfferService bpos = blockPoolManager.get(bpid); - if (bpos == null) { - throw new IOException("No block pool offer service for bpid=" + bpid); - } - - DatanodeProtocolClientSideTranslatorPB activeNN = bpos.getActiveNN(); - if (activeNN == null) { - throw new IOException( - "Block pool " + bpid + " has not recognized an active NN"); - } - return activeNN; - } - - /** Block synchronization */ - void syncBlock(RecoveringBlock rBlock, - List<BlockRecord> syncList) throws IOException { - ExtendedBlock block = rBlock.getBlock(); - final String bpid = block.getBlockPoolId(); - DatanodeProtocolClientSideTranslatorPB nn = - getActiveNamenodeForBP(block.getBlockPoolId()); - - long recoveryId = rBlock.getNewGenerationStamp(); - boolean isTruncateRecovery = rBlock.getNewBlock() != null; - long blockId = (isTruncateRecovery) ? - rBlock.getNewBlock().getBlockId() : block.getBlockId(); - - if (LOG.isDebugEnabled()) { - LOG.debug("block=" + block + ", (length=" + block.getNumBytes() - + "), syncList=" + syncList); - } - - // syncList.isEmpty() means that all data-nodes do not have the block - // or their replicas have 0 length. - // The block can be deleted. - if (syncList.isEmpty()) { - nn.commitBlockSynchronization(block, recoveryId, 0, - true, true, DatanodeID.EMPTY_ARRAY, null); - return; - } - - // Calculate the best available replica state. - ReplicaState bestState = ReplicaState.RWR; - long finalizedLength = -1; - for(BlockRecord r : syncList) { - assert r.rInfo.getNumBytes() > 0 : "zero length replica"; - ReplicaState rState = r.rInfo.getOriginalReplicaState(); - if(rState.getValue() < bestState.getValue()) - bestState = rState; - if(rState == ReplicaState.FINALIZED) { - if(finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes()) - throw new IOException("Inconsistent size of finalized replicas. " + - "Replica " + r.rInfo + " expected size: " + finalizedLength); - finalizedLength = r.rInfo.getNumBytes(); - } - } - - // Calculate list of nodes that will participate in the recovery - // and the new block size - List<BlockRecord> participatingList = new ArrayList<BlockRecord>(); - final ExtendedBlock newBlock = new ExtendedBlock(bpid, blockId, - -1, recoveryId); - switch(bestState) { - case FINALIZED: - assert finalizedLength > 0 : "finalizedLength is not positive"; - for(BlockRecord r : syncList) { - ReplicaState rState = r.rInfo.getOriginalReplicaState(); - if(rState == ReplicaState.FINALIZED || - rState == ReplicaState.RBW && - r.rInfo.getNumBytes() == finalizedLength) - participatingList.add(r); - } - newBlock.setNumBytes(finalizedLength); - break; - case RBW: - case RWR: - long minLength = Long.MAX_VALUE; - for(BlockRecord r : syncList) { - ReplicaState rState = r.rInfo.getOriginalReplicaState(); - if(rState == bestState) { - minLength = Math.min(minLength, r.rInfo.getNumBytes()); - participatingList.add(r); - } - } - newBlock.setNumBytes(minLength); - break; - case RUR: - case TEMPORARY: - assert false : "bad replica state: " + bestState; - } - if(isTruncateRecovery) - newBlock.setNumBytes(rBlock.getNewBlock().getNumBytes()); - - List<DatanodeID> failedList = new ArrayList<DatanodeID>(); - final List<BlockRecord> successList = new ArrayList<BlockRecord>(); - for(BlockRecord r : participatingList) { - try { - r.updateReplicaUnderRecovery(bpid, recoveryId, blockId, - newBlock.getNumBytes()); - successList.add(r); - } catch (IOException e) { - InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock=" - + newBlock + ", datanode=" + r.id + ")", e); - failedList.add(r.id); - } - } - - // If any of the data-nodes failed, the recovery fails, because - // we never know the actual state of the replica on failed data-nodes. - // The recovery should be started over. - if(!failedList.isEmpty()) { - StringBuilder b = new StringBuilder(); - for(DatanodeID id : failedList) { - b.append("\n " + id); - } - throw new IOException("Cannot recover " + block + ", the following " - + failedList.size() + " data-nodes failed {" + b + "\n}"); - } - - // Notify the name-node about successfully recovered replicas. - final DatanodeID[] datanodes = new DatanodeID[successList.size()]; - final String[] storages = new String[datanodes.length]; - for(int i = 0; i < datanodes.length; i++) { - final BlockRecord r = successList.get(i); - datanodes[i] = r.id; - storages[i] = r.storageID; - } - nn.commitBlockSynchronization(block, - newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false, - datanodes, storages); - } - - private static void logRecoverBlock(String who, RecoveringBlock rb) { - ExtendedBlock block = rb.getBlock(); - DatanodeInfo[] targets = rb.getLocations(); - - LOG.info(who + " calls recoverBlock(" + block - + ", targets=[" + Joiner.on(", ").join(targets) + "]" - + ((rb.getNewBlock() == null) ? ", newGenerationStamp=" - + rb.getNewGenerationStamp() : ", newBlock=" + rb.getNewBlock()) - + ")"); - } - @Override // ClientDataNodeProtocol public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { checkReadAccess(block); @@ -3337,7 +3075,11 @@ public class DataNode extends ReconfigurableBase checkSuperuserPrivilege(); tracerConfigurationManager.removeSpanReceiver(id); } - + + public BlockRecoveryWorker getBlockRecoveryWorker(){ + return blockRecoveryWorker; + } + public ErasureCodingWorker getErasureCodingWorker(){ return ecWorker; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e287e7d1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 92eb389..f60c973 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -64,7 +64,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; -import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord; +import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker.BlockRecord; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; @@ -79,7 +79,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.log4j.Level; import org.junit.After; @@ -98,6 +97,8 @@ public class TestBlockRecovery { private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory() + "data"; private DataNode dn; + private DataNode spyDN; + private BlockRecoveryWorker recoveryWorker; private Configuration conf; private boolean tearDownDone; private final static long RECOVERY_ID = 3000L; @@ -179,6 +180,8 @@ public class TestBlockRecovery { }; // Trigger a heartbeat so that it acknowledges the NN as active. dn.getAllBpOs().get(0).triggerHeartbeatForTests(); + spyDN = spy(dn); + recoveryWorker = new BlockRecoveryWorker(spyDN); } /** @@ -225,7 +228,10 @@ public class TestBlockRecovery { anyLong(), anyLong())).thenReturn("storage1"); when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), anyLong(), anyLong())).thenReturn("storage2"); - dn.syncBlock(rBlock, syncList); + + BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = + recoveryWorker.new RecoveryTaskContiguous(rBlock); + RecoveryTaskContiguous.syncBlock(syncList); } /** @@ -446,13 +452,17 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - DataNode spyDN = spy(dn); doThrow(new RecoveryInProgressException("Replica recovery is in progress")). when(spyDN).initReplicaRecovery(any(RecoveringBlock.class)); - Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks()); - d.join(); - verify(spyDN, never()).syncBlock( - any(RecoveringBlock.class), anyListOf(BlockRecord.class)); + + for(RecoveringBlock rBlock: initRecoveringBlocks()){ + BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = + recoveryWorker.new RecoveryTaskContiguous(rBlock); + BlockRecoveryWorker.RecoveryTaskContiguous spyTask + = spy(RecoveryTaskContiguous); + spyTask.recover(); + verify(spyTask, never()).syncBlock(anyListOf(BlockRecord.class)); + } } /** @@ -466,13 +476,21 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - DataNode spyDN = spy(dn); doThrow(new IOException()). when(spyDN).initReplicaRecovery(any(RecoveringBlock.class)); - Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks()); - d.join(); - verify(spyDN, never()).syncBlock( - any(RecoveringBlock.class), anyListOf(BlockRecord.class)); + + for(RecoveringBlock rBlock: initRecoveringBlocks()){ + BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = + recoveryWorker.new RecoveryTaskContiguous(rBlock); + BlockRecoveryWorker.RecoveryTaskContiguous spyTask = spy(RecoveryTaskContiguous); + try { + spyTask.recover(); + fail(); + } catch(IOException e){ + GenericTestUtils.assertExceptionContains("All datanodes failed", e); + } + verify(spyTask, never()).syncBlock(anyListOf(BlockRecord.class)); + } } /** @@ -485,13 +503,18 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - DataNode spyDN = spy(dn); doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0, block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN). initReplicaRecovery(any(RecoveringBlock.class)); - Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks()); - d.join(); - DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID); + + for(RecoveringBlock rBlock: initRecoveringBlocks()){ + BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = + recoveryWorker.new RecoveryTaskContiguous(rBlock); + BlockRecoveryWorker.RecoveryTaskContiguous spyTask + = spy(RecoveryTaskContiguous); + spyTask.recover(); + } + DatanodeProtocol dnP = recoveryWorker.getActiveNamenodeForBP(POOL_ID); verify(dnP).commitBlockSynchronization( block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null); } @@ -520,11 +543,12 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - DataNode spyDN = spy(dn); doThrow(new IOException()).when(spyDN).updateReplicaUnderRecovery( block, RECOVERY_ID, BLOCK_ID, block.getNumBytes()); try { - spyDN.syncBlock(rBlock, initBlockRecords(spyDN)); + BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = + recoveryWorker.new RecoveryTaskContiguous(rBlock); + RecoveryTaskContiguous.syncBlock(initBlockRecords(spyDN)); fail("Sync should fail"); } catch (IOException e) { e.getMessage().startsWith("Cannot recover "); @@ -542,13 +566,15 @@ public class TestBlockRecovery { LOG.debug("Running " + GenericTestUtils.getMethodName()); } dn.data.createRbw(StorageType.DEFAULT, block, false); + BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = + recoveryWorker.new RecoveryTaskContiguous(rBlock); try { - dn.syncBlock(rBlock, initBlockRecords(dn)); + RecoveryTaskContiguous.syncBlock(initBlockRecords(dn)); fail("Sync should fail"); } catch (IOException e) { e.getMessage().startsWith("Cannot recover "); } - DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID); + DatanodeProtocol namenode = recoveryWorker.getActiveNamenodeForBP(POOL_ID); verify(namenode, never()).commitBlockSynchronization( any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), anyBoolean(), any(DatanodeID[].class), any(String[].class)); @@ -572,13 +598,15 @@ public class TestBlockRecovery { DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); streams.getChecksumOut().write('a'); dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); + BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = + recoveryWorker.new RecoveryTaskContiguous(rBlock); try { - dn.syncBlock(rBlock, initBlockRecords(dn)); + RecoveryTaskContiguous.syncBlock(initBlockRecords(dn)); fail("Sync should fail"); } catch (IOException e) { e.getMessage().startsWith("Cannot recover "); } - DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID); + DatanodeProtocol namenode = recoveryWorker.getActiveNamenodeForBP(POOL_ID); verify(namenode, never()).commitBlockSynchronization( any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), anyBoolean(), any(DatanodeID[].class), any(String[].class));
