HDFS-9731. Erasure Coding: Rename BlockECRecoveryCommand to BlockECReconstructionCommand. Contributed by Rakesh R.
Change-Id: I405365a8395770e494b92bfe9651f4f0366d8f28 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ae543fd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ae543fd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ae543fd Branch: refs/heads/trunk Commit: 4ae543fdcd6dcfbe32257b1e72a405df9aa73e17 Parents: 913676d Author: zhezhang <[email protected]> Authored: Tue Feb 2 12:31:43 2016 -0800 Committer: zhezhang <[email protected]> Committed: Tue Feb 2 12:32:08 2016 -0800 ---------------------------------------------------------------------- .../src/main/proto/erasurecoding.proto | 4 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 18 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 90 ++-- .../blockmanagement/DatanodeDescriptor.java | 11 +- .../server/blockmanagement/DatanodeManager.java | 12 +- .../blockmanagement/ErasureCodingWork.java | 3 +- .../hdfs/server/datanode/BPOfferService.java | 9 +- .../erasurecode/ErasureCodingWorker.java | 197 ++++----- .../protocol/BlockECReconstructionCommand.java | 148 +++++++ .../server/protocol/BlockECRecoveryCommand.java | 147 ------- .../hdfs/server/protocol/DatanodeProtocol.java | 2 +- .../src/main/proto/DatanodeProtocol.proto | 10 +- .../src/main/resources/hdfs-default.xml | 19 +- .../hadoop/hdfs/TestReconstructStripedFile.java | 426 +++++++++++++++++++ .../hadoop/hdfs/TestRecoverStripedFile.java | 425 ------------------ .../hadoop/hdfs/protocolPB/TestPBHelper.java | 28 +- .../namenode/TestReconstructStripedBlocks.java | 239 +++++++++++ .../namenode/TestRecoverStripedBlocks.java | 238 ----------- 19 files changed, 1030 insertions(+), 999 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto index d73c208..4bb44fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/erasurecoding.proto @@ -47,9 +47,9 @@ message GetErasureCodingPolicyResponseProto { } /** - * Block erasure coding recovery info + * Block erasure coding reconstruction info */ -message BlockECRecoveryInfoProto { +message BlockECReconstructionInfoProto { required ExtendedBlockProto block = 1; required DatanodeInfosProto sourceDnInfos = 2; required DatanodeInfosProto targetDnInfos = 3; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0c9ab6f..1b04947 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -912,6 +912,9 @@ Trunk (Unreleased) HDFS-9659. EditLogTailerThread to Active Namenode RPC should timeout (surendra singh lilhore via vinayakumarb) + HDFS-9731. Erasure Coding: Rename BlockECRecoveryCommand to + BlockECReconstructionCommand. (Rakesh R via zhz) + Release 2.9.0 - UNRELEASED INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 76915cb..df205db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -465,14 +465,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 21600; public static final String DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = "dfs.datanode.directoryscan.threads"; public static final int DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1; - public static final String DFS_DATANODE_STRIPED_READ_THREADS_KEY = "dfs.datanode.stripedread.threads"; - public static final int DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT = 20; - public static final String DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.stripedread.buffer.size"; - public static final int DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024; - public static final String DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_KEY = "dfs.datanode.stripedread.timeout.millis"; - public static final int DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT = 5000; //5s - public static final String DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY = "dfs.datanode.striped.blockrecovery.threads.size"; - public static final int DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT = 8; + + public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY = "dfs.datanode.ec.reconstruction.stripedread.threads"; + public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT = 20; + public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY = "dfs.datanode.ec.reconstruction.stripedread.buffer.size"; + public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT = 64 * 1024; + public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY = "dfs.datanode.ec.reconstruction.stripedread.timeout.millis"; + public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT = 5000; //5s + public static final String DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_KEY = "dfs.datanode.ec.reconstruction.stripedblock.threads.size"; + public static final int DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_DEFAULT = 8; + public static final String DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY = "dfs.datanode.directoryscan.throttle.limit.ms.per.sec"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index e70cdf0..52ac5d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECRecoveryCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockECReconstructionCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto; @@ -48,7 +48,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDele import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; -import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECRecoveryInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; @@ -82,10 +82,10 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand; import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; @@ -453,8 +453,8 @@ public class PBHelper { return REG_CMD; case BlockIdCommand: return PBHelper.convert(proto.getBlkIdCmd()); - case BlockECRecoveryCommand: - return PBHelper.convert(proto.getBlkECRecoveryCmd()); + case BlockECReconstructionCommand: + return PBHelper.convert(proto.getBlkECReconstructionCmd()); default: return null; } @@ -584,10 +584,10 @@ public class PBHelper { builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand). setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand)); break; - case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY: - builder.setCmdType(DatanodeCommandProto.Type.BlockECRecoveryCommand) - .setBlkECRecoveryCmd( - convert((BlockECRecoveryCommand) datanodeCommand)); + case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: + builder.setCmdType(DatanodeCommandProto.Type.BlockECReconstructionCommand) + .setBlkECReconstructionCmd( + convert((BlockECReconstructionCommand) datanodeCommand)); break; case DatanodeProtocol.DNA_UNKNOWN: //Not expected default: @@ -873,42 +873,42 @@ public class PBHelper { return storageUuids; } - public static BlockECRecoveryInfo convertBlockECRecoveryInfo( - BlockECRecoveryInfoProto blockEcRecoveryInfoProto) { - ExtendedBlockProto blockProto = blockEcRecoveryInfoProto.getBlock(); + public static BlockECReconstructionInfo convertBlockECReconstructionInfo( + BlockECReconstructionInfoProto blockEcReconstructionInfoProto) { + ExtendedBlockProto blockProto = blockEcReconstructionInfoProto.getBlock(); ExtendedBlock block = PBHelperClient.convert(blockProto); - DatanodeInfosProto sourceDnInfosProto = blockEcRecoveryInfoProto + DatanodeInfosProto sourceDnInfosProto = blockEcReconstructionInfoProto .getSourceDnInfos(); DatanodeInfo[] sourceDnInfos = PBHelperClient.convert(sourceDnInfosProto); - DatanodeInfosProto targetDnInfosProto = blockEcRecoveryInfoProto + DatanodeInfosProto targetDnInfosProto = blockEcReconstructionInfoProto .getTargetDnInfos(); DatanodeInfo[] targetDnInfos = PBHelperClient.convert(targetDnInfosProto); - HdfsProtos.StorageUuidsProto targetStorageUuidsProto = blockEcRecoveryInfoProto - .getTargetStorageUuids(); + HdfsProtos.StorageUuidsProto targetStorageUuidsProto = + blockEcReconstructionInfoProto.getTargetStorageUuids(); String[] targetStorageUuids = convert(targetStorageUuidsProto); - StorageTypesProto targetStorageTypesProto = blockEcRecoveryInfoProto + StorageTypesProto targetStorageTypesProto = blockEcReconstructionInfoProto .getTargetStorageTypes(); StorageType[] convertStorageTypes = PBHelperClient.convertStorageTypes( targetStorageTypesProto.getStorageTypesList(), targetStorageTypesProto .getStorageTypesList().size()); - byte[] liveBlkIndices = blockEcRecoveryInfoProto.getLiveBlockIndices() + byte[] liveBlkIndices = blockEcReconstructionInfoProto.getLiveBlockIndices() .toByteArray(); ErasureCodingPolicy ecPolicy = PBHelperClient.convertErasureCodingPolicy( - blockEcRecoveryInfoProto.getEcPolicy()); - return new BlockECRecoveryInfo(block, sourceDnInfos, targetDnInfos, + blockEcReconstructionInfoProto.getEcPolicy()); + return new BlockECReconstructionInfo(block, sourceDnInfos, targetDnInfos, targetStorageUuids, convertStorageTypes, liveBlkIndices, ecPolicy); } - public static BlockECRecoveryInfoProto convertBlockECRecoveryInfo( - BlockECRecoveryInfo blockEcRecoveryInfo) { - BlockECRecoveryInfoProto.Builder builder = BlockECRecoveryInfoProto - .newBuilder(); + public static BlockECReconstructionInfoProto convertBlockECRecoveryInfo( + BlockECReconstructionInfo blockEcRecoveryInfo) { + BlockECReconstructionInfoProto.Builder builder = + BlockECReconstructionInfoProto.newBuilder(); builder.setBlock(PBHelperClient.convert( blockEcRecoveryInfo.getExtendedBlock())); @@ -934,29 +934,31 @@ public class PBHelper { return builder.build(); } - public static BlockECRecoveryCommandProto convert( - BlockECRecoveryCommand blkECRecoveryCmd) { - BlockECRecoveryCommandProto.Builder builder = BlockECRecoveryCommandProto - .newBuilder(); - Collection<BlockECRecoveryInfo> blockECRecoveryInfos = blkECRecoveryCmd - .getECTasks(); - for (BlockECRecoveryInfo blkECRecoveryInfo : blockECRecoveryInfos) { - builder - .addBlockECRecoveryinfo(convertBlockECRecoveryInfo(blkECRecoveryInfo)); + public static BlockECReconstructionCommandProto convert( + BlockECReconstructionCommand blkECReconstructionCmd) { + BlockECReconstructionCommandProto.Builder builder = + BlockECReconstructionCommandProto.newBuilder(); + Collection<BlockECReconstructionInfo> blockECRInfos = + blkECReconstructionCmd.getECTasks(); + for (BlockECReconstructionInfo blkECReconstructInfo : blockECRInfos) { + builder.addBlockECReconstructioninfo( + convertBlockECRecoveryInfo(blkECReconstructInfo)); } return builder.build(); } - public static BlockECRecoveryCommand convert( - BlockECRecoveryCommandProto blkECRecoveryCmdProto) { - Collection<BlockECRecoveryInfo> blkECRecoveryInfos = new ArrayList<>(); - List<BlockECRecoveryInfoProto> blockECRecoveryinfoList = blkECRecoveryCmdProto - .getBlockECRecoveryinfoList(); - for (BlockECRecoveryInfoProto blockECRecoveryInfoProto : blockECRecoveryinfoList) { - blkECRecoveryInfos - .add(convertBlockECRecoveryInfo(blockECRecoveryInfoProto)); + public static BlockECReconstructionCommand convert( + BlockECReconstructionCommandProto blkECReconstructionCmdProto) { + Collection<BlockECReconstructionInfo> blkECReconstructionInfos = + new ArrayList<>(); + List<BlockECReconstructionInfoProto> blkECRInfoList = + blkECReconstructionCmdProto.getBlockECReconstructioninfoList(); + for (BlockECReconstructionInfoProto blkECRInfoProto : blkECRInfoList) { + blkECReconstructionInfos + .add(convertBlockECReconstructionInfo(blkECRInfoProto)); } - return new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY, - blkECRecoveryInfos); + return new BlockECReconstructionCommand( + DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION, + blkECReconstructionInfos); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 46f3738..9e7ab20 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -42,7 +42,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; @@ -204,7 +204,7 @@ public class DatanodeDescriptor extends DatanodeInfo { private final BlockQueue<BlockTargetPair> replicateBlocks = new BlockQueue<>(); /** A queue of blocks to be erasure coded by this datanode */ - private final BlockQueue<BlockECRecoveryInfo> erasurecodeBlocks = + private final BlockQueue<BlockECReconstructionInfo> erasurecodeBlocks = new BlockQueue<>(); /** A queue of blocks to be recovered by this datanode */ private final BlockQueue<BlockInfo> recoverBlocks = new BlockQueue<>(); @@ -605,8 +605,8 @@ public class DatanodeDescriptor extends DatanodeInfo { DatanodeDescriptor[] sources, DatanodeStorageInfo[] targets, byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) { assert (block != null && sources != null && sources.length > 0); - BlockECRecoveryInfo task = new BlockECRecoveryInfo(block, sources, targets, - liveBlockIndices, ecPolicy); + BlockECReconstructionInfo task = new BlockECReconstructionInfo(block, + sources, targets, liveBlockIndices, ecPolicy); erasurecodeBlocks.offer(task); BlockManager.LOG.debug("Adding block recovery task " + task + "to " + getName() + ", current queue size is " + erasurecodeBlocks.size()); @@ -655,7 +655,8 @@ public class DatanodeDescriptor extends DatanodeInfo { return replicateBlocks.poll(maxTransfers); } - public List<BlockECRecoveryInfo> getErasureCodeCommand(int maxTransfers) { + public List<BlockECReconstructionInfo> getErasureCodeCommand( + int maxTransfers) { return erasurecodeBlocks.poll(maxTransfers); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 53f7043..d344ca6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; -import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY; +import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION; import static org.apache.hadoop.util.Time.monotonicNow; import com.google.common.annotations.VisibleForTesting; @@ -40,7 +40,7 @@ import org.apache.hadoop.hdfs.server.namenode.CachedBlock; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.protocol.*; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock; import org.apache.hadoop.ipc.Server; @@ -1455,11 +1455,11 @@ public class DatanodeManager { pendingList)); } // check pending erasure coding tasks - List<BlockECRecoveryInfo> pendingECList = nodeinfo.getErasureCodeCommand( - maxTransfers); + List<BlockECReconstructionInfo> pendingECList = nodeinfo + .getErasureCodeCommand(maxTransfers); if (pendingECList != null) { - cmds.add(new BlockECRecoveryCommand(DNA_ERASURE_CODING_RECOVERY, - pendingECList)); + cmds.add(new BlockECReconstructionCommand( + DNA_ERASURE_CODING_RECONSTRUCTION, pendingECList)); } // check block invalidation Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java index bb2e492..fec669c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java @@ -35,7 +35,8 @@ class ErasureCodingWork extends BlockRecoveryWork { super(block, bc, srcNodes, containingNodes, liveReplicaStorages, additionalReplRequired, priority); this.liveBlockIndicies = liveBlockIndicies; - BlockManager.LOG.debug("Creating an ErasureCodingWork to recover " + block); + BlockManager.LOG.debug("Creating an ErasureCodingWork to {} reconstruct ", + block); } byte[] getLiveBlockIndicies() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index cd30ead..5b5dc4c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.protocol.*; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.slf4j.Logger; @@ -725,9 +725,10 @@ class BPOfferService { dxcs.balanceThrottler.setBandwidth(bandwidth); } break; - case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY: + case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: LOG.info("DatanodeCommand action: DNA_ERASURE_CODING_RECOVERY"); - Collection<BlockECRecoveryInfo> ecTasks = ((BlockECRecoveryCommand) cmd).getECTasks(); + Collection<BlockECReconstructionInfo> ecTasks = + ((BlockECReconstructionCommand) cmd).getECTasks(); dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks); break; default: @@ -759,7 +760,7 @@ class BPOfferService { case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE: case DatanodeProtocol.DNA_CACHE: case DatanodeProtocol.DNA_UNCACHE: - case DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY: + case DatanodeProtocol.DNA_ERASURE_CODING_RECONSTRUCTION: LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction()); break; default: http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index 6ad7164..60c8417 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -67,7 +67,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactor import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripingChunkReadResult; import org.apache.hadoop.io.IOUtils; @@ -83,10 +83,10 @@ import com.google.common.base.Preconditions; import org.slf4j.Logger; /** - * ErasureCodingWorker handles the erasure coding recovery work commands. These - * commands would be issued from Namenode as part of Datanode's heart beat - * response. BPOfferService delegates the work to this class for handling EC - * commands. + * ErasureCodingWorker handles the erasure coding reconstruction work commands. + * These commands would be issued from Namenode as part of Datanode's heart + * beat response. BPOfferService delegates the work to this class for handling + * EC commands. */ @InterfaceAudience.Private public final class ErasureCodingWorker { @@ -95,28 +95,28 @@ public final class ErasureCodingWorker { private final DataNode datanode; private final Configuration conf; - private ThreadPoolExecutor STRIPED_BLK_RECOVERY_THREAD_POOL; - private ThreadPoolExecutor STRIPED_READ_THREAD_POOL; - private final int STRIPED_READ_TIMEOUT_MILLIS; - private final int STRIPED_READ_BUFFER_SIZE; + private ThreadPoolExecutor EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL; + private ThreadPoolExecutor EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL; + private final int EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS; + private final int EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE; public ErasureCodingWorker(Configuration conf, DataNode datanode) { this.datanode = datanode; this.conf = conf; - STRIPED_READ_TIMEOUT_MILLIS = conf.getInt( - DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_KEY, - DFSConfigKeys.DFS_DATANODE_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT); + EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS = conf.getInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_KEY, + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS_DEFAULT); initializeStripedReadThreadPool(conf.getInt( - DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_KEY, - DFSConfigKeys.DFS_DATANODE_STRIPED_READ_THREADS_DEFAULT)); - STRIPED_READ_BUFFER_SIZE = conf.getInt( - DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_KEY, - DFSConfigKeys.DFS_DATANODE_STRIPED_READ_BUFFER_SIZE_DEFAULT); - - initializeStripedBlkRecoveryThreadPool(conf.getInt( - DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_KEY, - DFSConfigKeys.DFS_DATANODE_STRIPED_BLK_RECOVERY_THREADS_DEFAULT)); + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_KEY, + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_THREADS_DEFAULT)); + EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE = conf.getInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_DEFAULT); + + initializeStripedBlkReconstructionThreadPool(conf.getInt( + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_KEY, + DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_DEFAULT)); } private RawErasureDecoder newDecoder(int numDataUnits, int numParityUnits) { @@ -126,8 +126,8 @@ public final class ErasureCodingWorker { private void initializeStripedReadThreadPool(int num) { LOG.debug("Using striped reads; pool threads=" + num); - STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60, - TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), + EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, + 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new Daemon.DaemonFactory() { private final AtomicInteger threadIndex = new AtomicInteger(0); @@ -146,48 +146,50 @@ public final class ErasureCodingWorker { super.rejectedExecution(runnable, e); } }); - STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); + EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL.allowCoreThreadTimeOut(true); } - private void initializeStripedBlkRecoveryThreadPool(int num) { - LOG.debug("Using striped block recovery; pool threads=" + num); - STRIPED_BLK_RECOVERY_THREAD_POOL = new ThreadPoolExecutor(2, num, 60, - TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), + private void initializeStripedBlkReconstructionThreadPool(int num) { + LOG.debug("Using striped block reconstruction; pool threads=" + num); + EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL = new ThreadPoolExecutor(2, num, + 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new Daemon.DaemonFactory() { private final AtomicInteger threadIdx = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread t = super.newThread(r); - t.setName("stripedBlockRecovery-" + threadIdx.getAndIncrement()); + t.setName( + "stripedBlockReconstruction-" + threadIdx.getAndIncrement()); return t; } }); - STRIPED_BLK_RECOVERY_THREAD_POOL.allowCoreThreadTimeOut(true); + EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL.allowCoreThreadTimeOut(true); } /** - * Handles the Erasure Coding recovery work commands. - * + * Handles the Erasure Coding reconstruction work commands. + * * @param ecTasks - * BlockECRecoveryInfo + * BlockECReconstructionInfo */ - public void processErasureCodingTasks(Collection<BlockECRecoveryInfo> ecTasks) { - for (BlockECRecoveryInfo recoveryInfo : ecTasks) { + public void processErasureCodingTasks( + Collection<BlockECReconstructionInfo> ecTasks) { + for (BlockECReconstructionInfo reconstructionInfo : ecTasks) { try { - STRIPED_BLK_RECOVERY_THREAD_POOL - .submit(new ReconstructAndTransferBlock(recoveryInfo)); + EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL + .submit(new ReconstructAndTransferBlock(reconstructionInfo)); } catch (Throwable e) { - LOG.warn("Failed to recover striped block " - + recoveryInfo.getExtendedBlock().getLocalBlock(), e); + LOG.warn("Failed to reconstruct striped block " + + reconstructionInfo.getExtendedBlock().getLocalBlock(), e); } } } /** - * ReconstructAndTransferBlock recover one or more missed striped block in the - * striped block group, the minimum number of live striped blocks should be - * no less than data block number. + * ReconstructAndTransferBlock reconstruct one or more missed striped block + * in the striped block group, the minimum number of live striped blocks + * should be no less than data block number. * * | <- Striped Block Group -> | * blk_0 blk_1 blk_2(*) blk_3 ... <- A striped block group @@ -203,12 +205,12 @@ public final class ErasureCodingWorker { * ... ... ... ... * * - * We use following steps to recover striped block group, in each round, we - * recover <code>bufferSize</code> data until finish, the + * We use following steps to reconstruct striped block group, in each round, + * we reconstruct <code>bufferSize</code> data until finish, the * <code>bufferSize</code> is configurable and may be less or larger than * cell size: * step1: read <code>bufferSize</code> data from minimum number of sources - * required by recovery. + * required by reconstruction. * step2: decode data for targets. * step3: transfer data to targets. * @@ -217,25 +219,25 @@ public final class ErasureCodingWorker { * will be scheduled. The best sources are remembered for next round and * may be updated in each round. * - * In step2, typically if source blocks we read are all data blocks, we + * In step2, typically if source blocks we read are all data blocks, we * need to call encode, and if there is one parity block, we need to call - * decode. Notice we only read once and recover all missed striped block + * decode. Notice we only read once and reconstruct all missed striped block * if they are more than one. * - * In step3, send the recovered data to targets by constructing packet - * and send them directly. Same as continuous block replication, we - * don't check the packet ack. Since the datanode doing the recovery work - * are one of the source datanodes, so the recovered data are sent + * In step3, send the reconstructed data to targets by constructing packet + * and send them directly. Same as continuous block replication, we + * don't check the packet ack. Since the datanode doing the reconstruction + * work are one of the source datanodes, so the reconstructed data are sent * remotely. * * There are some points we can do further improvements in next phase: * 1. we can read the block file directly on the local datanode, * currently we use remote block reader. (Notice short-circuit is not * a good choice, see inline comments). - * 2. We need to check the packet ack for EC recovery? Since EC recovery - * is more expensive than continuous block replication, it needs to - * read from several other datanodes, should we make sure the - * recovered result received by targets? + * 2. We need to check the packet ack for EC reconstruction? Since EC + * reconstruction is more expensive than continuous block replication, + * it needs to read from several other datanodes, should we make sure + * the reconstructed result received by targets? */ private class ReconstructAndTransferBlock implements Runnable { private final int dataBlkNum; @@ -288,20 +290,22 @@ public final class ErasureCodingWorker { private final Map<Future<Void>, Integer> futures = new HashMap<>(); private final CompletionService<Void> readService = - new ExecutorCompletionService<>(STRIPED_READ_THREAD_POOL); + new ExecutorCompletionService<>( + EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL); - ReconstructAndTransferBlock(BlockECRecoveryInfo recoveryInfo) { - ErasureCodingPolicy ecPolicy = recoveryInfo.getErasureCodingPolicy(); + ReconstructAndTransferBlock(BlockECReconstructionInfo reconstructionInfo) { + ErasureCodingPolicy ecPolicy = reconstructionInfo + .getErasureCodingPolicy(); dataBlkNum = ecPolicy.getNumDataUnits(); parityBlkNum = ecPolicy.getNumParityUnits(); cellSize = ecPolicy.getCellSize(); - blockGroup = recoveryInfo.getExtendedBlock(); + blockGroup = reconstructionInfo.getExtendedBlock(); final int cellsNum = (int)((blockGroup.getNumBytes() - 1) / cellSize + 1); minRequiredSources = Math.min(cellsNum, dataBlkNum); - liveIndices = recoveryInfo.getLiveBlockIndices(); - sources = recoveryInfo.getSourceDnInfos(); + liveIndices = reconstructionInfo.getLiveBlockIndices(); + sources = reconstructionInfo.getSourceDnInfos(); stripedReaders = new ArrayList<>(sources.length); Preconditions.checkArgument(liveIndices.length >= minRequiredSources, @@ -315,8 +319,8 @@ public final class ErasureCodingWorker { zeroStripeIndices = new short[dataBlkNum - minRequiredSources]; } - targets = recoveryInfo.getTargetDnInfos(); - targetStorageTypes = recoveryInfo.getTargetStorageTypes(); + targets = reconstructionInfo.getTargetDnInfos(); + targetStorageTypes = reconstructionInfo.getTargetStorageTypes(); targetIndices = new short[targets.length]; targetBuffers = new ByteBuffer[targets.length]; @@ -402,7 +406,7 @@ public final class ErasureCodingWorker { if (nsuccess < minRequiredSources) { String error = "Can't find minimum sources required by " - + "recovery, block id: " + blockGroup.getBlockId(); + + "reconstruction, block id: " + blockGroup.getBlockId(); throw new IOException(error); } @@ -441,21 +445,21 @@ public final class ErasureCodingWorker { getBlockLen(blockGroup, targetIndex)); } while (positionInBlock < maxTargetLength) { - final int toRecover = (int) Math.min( + final int toReconstruct = (int) Math.min( bufferSize, maxTargetLength - positionInBlock); // step1: read from minimum source DNs required for reconstruction. // The returned success list is the source DNs we do real read from Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap = new HashMap<>(); try { - success = readMinimumStripedData4Recovery(success, toRecover, - corruptionMap); + success = readMinimumStripedData4Reconstruction(success, + toReconstruct, corruptionMap); } finally { // report corrupted blocks to NN reportCorruptedBlocks(corruptionMap); } // step2: decode to reconstruct targets - recoverTargets(success, targetsStatus, toRecover); + reconstructTargets(success, targetsStatus, toReconstruct); // step3: transfer data if (transferData2Targets(targetsStatus) == 0) { @@ -464,7 +468,7 @@ public final class ErasureCodingWorker { } clearBuffers(); - positionInBlock += toRecover; + positionInBlock += toReconstruct; } endTargetBlocks(targetsStatus); @@ -472,7 +476,7 @@ public final class ErasureCodingWorker { // Currently we don't check the acks for packets, this is similar as // block replication. } catch (Throwable e) { - LOG.warn("Failed to recover striped block: " + blockGroup, e); + LOG.warn("Failed to reconstruct striped block: " + blockGroup, e); } finally { datanode.decrementXmitsInProgress(); // close block readers @@ -493,7 +497,7 @@ public final class ErasureCodingWorker { checksum = blockReader.getDataChecksum(); bytesPerChecksum = checksum.getBytesPerChecksum(); // The bufferSize is flat to divide bytesPerChecksum - int readBufferSize = STRIPED_READ_BUFFER_SIZE; + int readBufferSize = EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE; bufferSize = readBufferSize < bytesPerChecksum ? bytesPerChecksum : readBufferSize - readBufferSize % bytesPerChecksum; } else { @@ -521,11 +525,11 @@ public final class ErasureCodingWorker { } } - /** the reading length should not exceed the length for recovery */ - private int getReadLength(int index, int recoverLength) { + /** the reading length should not exceed the length for reconstruction. */ + private int getReadLength(int index, int reconstructLength) { long blockLen = getBlockLen(blockGroup, index); long remaining = blockLen - positionInBlock; - return (int) Math.min(remaining, recoverLength); + return (int) Math.min(remaining, reconstructLength); } /** @@ -538,15 +542,16 @@ public final class ErasureCodingWorker { * operations and next iteration read. * * @param success the initial success list of source DNs we think best - * @param recoverLength the length to recover. + * @param reconstructLength the length to reconstruct. * @return updated success list of source DNs we do real read * @throws IOException */ - private int[] readMinimumStripedData4Recovery(final int[] success, - int recoverLength, Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) - throws IOException { - Preconditions.checkArgument(recoverLength >= 0 && - recoverLength <= bufferSize); + private int[] readMinimumStripedData4Reconstruction(final int[] success, + int reconstructLength, + Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) + throws IOException { + Preconditions.checkArgument(reconstructLength >= 0 && + reconstructLength <= bufferSize); int nsuccess = 0; int[] newSuccess = new int[minRequiredSources]; BitSet used = new BitSet(sources.length); @@ -557,7 +562,7 @@ public final class ErasureCodingWorker { for (int i = 0; i < minRequiredSources; i++) { StripedReader reader = stripedReaders.get(success[i]); final int toRead = getReadLength(liveIndices[success[i]], - recoverLength); + reconstructLength); if (toRead > 0) { Callable<Void> readCallable = readFromBlock(reader, reader.buffer, toRead, corruptionMap); @@ -573,9 +578,9 @@ public final class ErasureCodingWorker { while (!futures.isEmpty()) { try { - StripingChunkReadResult result = - StripedBlockUtil.getNextCompletedStripedRead( - readService, futures, STRIPED_READ_TIMEOUT_MILLIS); + StripingChunkReadResult result = StripedBlockUtil + .getNextCompletedStripedRead(readService, futures, + EC_RECONSTRUCTION_STRIPED_READ_TIMEOUT_MILLIS); int resultIndex = -1; if (result.state == StripingChunkReadResult.SUCCESSFUL) { resultIndex = result.index; @@ -585,10 +590,12 @@ public final class ErasureCodingWorker { StripedReader failedReader = stripedReaders.get(result.index); closeBlockReader(failedReader.blockReader); failedReader.blockReader = null; - resultIndex = scheduleNewRead(used, recoverLength, corruptionMap); + resultIndex = scheduleNewRead(used, reconstructLength, + corruptionMap); } else if (result.state == StripingChunkReadResult.TIMEOUT) { // If timeout, we also schedule a new read. - resultIndex = scheduleNewRead(used, recoverLength, corruptionMap); + resultIndex = scheduleNewRead(used, reconstructLength, + corruptionMap); } if (resultIndex >= 0) { newSuccess[nsuccess++] = resultIndex; @@ -643,20 +650,20 @@ public final class ErasureCodingWorker { return Arrays.copyOf(result, m); } - private void recoverTargets(int[] success, boolean[] targetsStatus, - int toRecoverLen) { + private void reconstructTargets(int[] success, boolean[] targetsStatus, + int toReconstructLen) { initDecoderIfNecessary(); ByteBuffer[] inputs = new ByteBuffer[dataBlkNum + parityBlkNum]; for (int i = 0; i < success.length; i++) { StripedReader reader = stripedReaders.get(success[i]); ByteBuffer buffer = reader.buffer; - paddingBufferToLen(buffer, toRecoverLen); + paddingBufferToLen(buffer, toReconstructLen); inputs[reader.index] = (ByteBuffer)buffer.flip(); } if (success.length < dataBlkNum) { for (int i = 0; i < zeroStripeBuffers.length; i++) { ByteBuffer buffer = zeroStripeBuffers[i]; - paddingBufferToLen(buffer, toRecoverLen); + paddingBufferToLen(buffer, toReconstructLen); int index = zeroStripeIndices[i]; inputs[index] = (ByteBuffer)buffer.flip(); } @@ -666,7 +673,7 @@ public final class ErasureCodingWorker { int m = 0; for (int i = 0; i < targetBuffers.length; i++) { if (targetsStatus[i]) { - targetBuffers[i].limit(toRecoverLen); + targetBuffers[i].limit(toReconstructLen); outputs[m++] = targetBuffers[i]; } } @@ -678,7 +685,7 @@ public final class ErasureCodingWorker { long remaining = blockLen - positionInBlock; if (remaining <= 0) { targetBuffers[i].limit(0); - } else if (remaining < toRecoverLen) { + } else if (remaining < toReconstructLen) { targetBuffers[i].limit((int)remaining); } } @@ -696,7 +703,7 @@ public final class ErasureCodingWorker { * @param used the used source DNs in this iteration. * @return the array index of source DN if don't need to do real read. */ - private int scheduleNewRead(BitSet used, int recoverLength, + private int scheduleNewRead(BitSet used, int reconstructLen, Map<ExtendedBlock, Set<DatanodeInfo>> corruptionMap) { StripedReader reader = null; // step1: initially we may only have <code>minRequiredSources</code> @@ -707,7 +714,7 @@ public final class ErasureCodingWorker { int toRead = 0; while (reader == null && m < sources.length) { reader = addStripedReader(m, positionInBlock); - toRead = getReadLength(liveIndices[m], recoverLength); + toRead = getReadLength(liveIndices[m], reconstructLen); if (toRead > 0) { if (reader.blockReader == null) { reader = null; @@ -727,7 +734,7 @@ public final class ErasureCodingWorker { for (int i = 0; reader == null && i < stripedReaders.size(); i++) { if (!used.get(i)) { StripedReader r = stripedReaders.get(i); - toRead = getReadLength(liveIndices[i], recoverLength); + toRead = getReadLength(liveIndices[i], reconstructLen); if (toRead > 0) { closeBlockReader(r.blockReader); r.blockReader = newBlockReader( http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECReconstructionCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECReconstructionCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECReconstructionCommand.java new file mode 100644 index 0000000..6e9c55b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECReconstructionCommand.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import com.google.common.base.Joiner; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; + +import java.util.Arrays; +import java.util.Collection; + +/** + * A BlockECReconstructionCommand is an instruction to a DataNode to + * reconstruct a striped block group with missing blocks. + * + * Upon receiving this command, the DataNode pulls data from other DataNodes + * hosting blocks in this group and reconstructs the lost blocks through codec + * calculation. + * + * After the reconstruction, the DataNode pushes the reconstructed blocks to + * their final destinations if necessary (e.g., the destination is different + * from the reconstruction node, or multiple blocks in a group are to be + * reconstructed). + */ [email protected] [email protected] +public class BlockECReconstructionCommand extends DatanodeCommand { + private final Collection<BlockECReconstructionInfo> ecTasks; + + /** + * Create BlockECReconstructionCommand from a collection of + * {@link BlockECReconstructionInfo}, each representing a reconstruction + * task + */ + public BlockECReconstructionCommand(int action, + Collection<BlockECReconstructionInfo> blockECReconstructionInfoList) { + super(action); + this.ecTasks = blockECReconstructionInfoList; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("BlockECReconstructionCommand(\n "); + Joiner.on("\n ").appendTo(sb, ecTasks); + sb.append("\n)"); + return sb.toString(); + } + + /** Block and targets pair */ + @InterfaceAudience.Private + @InterfaceStability.Evolving + public static class BlockECReconstructionInfo { + private final ExtendedBlock block; + private final DatanodeInfo[] sources; + private DatanodeInfo[] targets; + private String[] targetStorageIDs; + private StorageType[] targetStorageTypes; + private final byte[] liveBlockIndices; + private final ErasureCodingPolicy ecPolicy; + + public BlockECReconstructionInfo(ExtendedBlock block, + DatanodeInfo[] sources, DatanodeStorageInfo[] targetDnStorageInfo, + byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) { + this(block, sources, DatanodeStorageInfo + .toDatanodeInfos(targetDnStorageInfo), DatanodeStorageInfo + .toStorageIDs(targetDnStorageInfo), DatanodeStorageInfo + .toStorageTypes(targetDnStorageInfo), liveBlockIndices, ecPolicy); + } + + public BlockECReconstructionInfo(ExtendedBlock block, + DatanodeInfo[] sources, DatanodeInfo[] targets, + String[] targetStorageIDs, StorageType[] targetStorageTypes, + byte[] liveBlockIndices, ErasureCodingPolicy ecPolicy) { + this.block = block; + this.sources = sources; + this.targets = targets; + this.targetStorageIDs = targetStorageIDs; + this.targetStorageTypes = targetStorageTypes; + this.liveBlockIndices = liveBlockIndices == null ? + new byte[]{} : liveBlockIndices; + this.ecPolicy = ecPolicy; + } + + public ExtendedBlock getExtendedBlock() { + return block; + } + + public DatanodeInfo[] getSourceDnInfos() { + return sources; + } + + public DatanodeInfo[] getTargetDnInfos() { + return targets; + } + + public String[] getTargetStorageIDs() { + return targetStorageIDs; + } + + public StorageType[] getTargetStorageTypes() { + return targetStorageTypes; + } + + public byte[] getLiveBlockIndices() { + return liveBlockIndices; + } + + public ErasureCodingPolicy getErasureCodingPolicy() { + return ecPolicy; + } + + @Override + public String toString() { + return new StringBuilder().append("BlockECReconstructionInfo(\n ") + .append("Recovering ").append(block).append(" From: ") + .append(Arrays.asList(sources)).append(" To: [") + .append(Arrays.asList(targets)).append(")\n") + .append(" Block Indices: ").append(Arrays.toString(liveBlockIndices)) + .toString(); + } + } + + public Collection<BlockECReconstructionInfo> getECTasks() { + return this.ecTasks; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java deleted file mode 100644 index d0c1786..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockECRecoveryCommand.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.protocol; - -import com.google.common.base.Joiner; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; - -import java.util.Arrays; -import java.util.Collection; - -/** - * A BlockECRecoveryCommand is an instruction to a DataNode to reconstruct a - * striped block group with missing blocks. - * - * Upon receiving this command, the DataNode pulls data from other DataNodes - * hosting blocks in this group and reconstructs the lost blocks through codec - * calculation. - * - * After the reconstruction, the DataNode pushes the reconstructed blocks to - * their final destinations if necessary (e.g., the destination is different - * from the reconstruction node, or multiple blocks in a group are to be - * reconstructed). - */ [email protected] [email protected] -public class BlockECRecoveryCommand extends DatanodeCommand { - final Collection<BlockECRecoveryInfo> ecTasks; - - /** - * Create BlockECRecoveryCommand from a collection of - * {@link BlockECRecoveryInfo}, each representing a recovery task - */ - public BlockECRecoveryCommand(int action, - Collection<BlockECRecoveryInfo> blockECRecoveryInfoList) { - super(action); - this.ecTasks = blockECRecoveryInfoList; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("BlockECRecoveryCommand(\n "); - Joiner.on("\n ").appendTo(sb, ecTasks); - sb.append("\n)"); - return sb.toString(); - } - - /** Block and targets pair */ - @InterfaceAudience.Private - @InterfaceStability.Evolving - public static class BlockECRecoveryInfo { - private final ExtendedBlock block; - private final DatanodeInfo[] sources; - private DatanodeInfo[] targets; - private String[] targetStorageIDs; - private StorageType[] targetStorageTypes; - private final byte[] liveBlockIndices; - private final ErasureCodingPolicy ecPolicy; - - public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, - DatanodeStorageInfo[] targetDnStorageInfo, byte[] liveBlockIndices, - ErasureCodingPolicy ecPolicy) { - this(block, sources, DatanodeStorageInfo - .toDatanodeInfos(targetDnStorageInfo), DatanodeStorageInfo - .toStorageIDs(targetDnStorageInfo), DatanodeStorageInfo - .toStorageTypes(targetDnStorageInfo), liveBlockIndices, ecPolicy); - } - - public BlockECRecoveryInfo(ExtendedBlock block, DatanodeInfo[] sources, - DatanodeInfo[] targets, String[] targetStorageIDs, - StorageType[] targetStorageTypes, byte[] liveBlockIndices, - ErasureCodingPolicy ecPolicy) { - this.block = block; - this.sources = sources; - this.targets = targets; - this.targetStorageIDs = targetStorageIDs; - this.targetStorageTypes = targetStorageTypes; - this.liveBlockIndices = liveBlockIndices == null ? - new byte[]{} : liveBlockIndices; - this.ecPolicy = ecPolicy; - } - - public ExtendedBlock getExtendedBlock() { - return block; - } - - public DatanodeInfo[] getSourceDnInfos() { - return sources; - } - - public DatanodeInfo[] getTargetDnInfos() { - return targets; - } - - public String[] getTargetStorageIDs() { - return targetStorageIDs; - } - - public StorageType[] getTargetStorageTypes() { - return targetStorageTypes; - } - - public byte[] getLiveBlockIndices() { - return liveBlockIndices; - } - - public ErasureCodingPolicy getErasureCodingPolicy() { - return ecPolicy; - } - - @Override - public String toString() { - return new StringBuilder().append("BlockECRecoveryInfo(\n ") - .append("Recovering ").append(block).append(" From: ") - .append(Arrays.asList(sources)).append(" To: [") - .append(Arrays.asList(targets)).append(")\n") - .append(" Block Indices: ").append(Arrays.toString(liveBlockIndices)) - .toString(); - } - } - - public Collection<BlockECRecoveryInfo> getECTasks() { - return this.ecTasks; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index b962855..8c4359f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -76,7 +76,7 @@ public interface DatanodeProtocol { final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth final static int DNA_CACHE = 9; // cache blocks final static int DNA_UNCACHE = 10; // uncache blocks - final static int DNA_ERASURE_CODING_RECOVERY = 11; // erasure coding recovery command + final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction command /** * Register Datanode. http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 02d5b81..7111185 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -60,7 +60,7 @@ message DatanodeCommandProto { UnusedUpgradeCommand = 6; NullDatanodeCommand = 7; BlockIdCommand = 8; - BlockECRecoveryCommand = 9; + BlockECReconstructionCommand = 9; } required Type cmdType = 1; // Type of the command @@ -74,7 +74,7 @@ message DatanodeCommandProto { optional KeyUpdateCommandProto keyUpdateCmd = 6; optional RegisterCommandProto registerCmd = 7; optional BlockIdCommandProto blkIdCmd = 8; - optional BlockECRecoveryCommandProto blkECRecoveryCmd = 9; + optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9; } /** @@ -149,10 +149,10 @@ message RegisterCommandProto { } /** - * Block Erasure coding recovery command + * Block Erasure coding reconstruction command */ -message BlockECRecoveryCommandProto { - repeated BlockECRecoveryInfoProto blockECRecoveryinfo = 1; +message BlockECReconstructionCommandProto { + repeated BlockECReconstructionInfoProto blockECReconstructioninfo = 1; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 7607c32..4889bc3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2654,27 +2654,38 @@ </property> <property> - <name>dfs.datanode.stripedread.timeout.millis</name> + <name>dfs.datanode.ec.reconstruction.stripedread.timeout.millis</name> <value>5000</value> <description>Datanode striped read timeout in milliseconds. </description> </property> <property> - <name>dfs.datanode.stripedread.threads</name> + <name>dfs.datanode.ec.reconstruction.stripedread.threads</name> <value>20</value> - <description>Number of threads used by the Datanode for background recovery work. + <description> + Number of threads used by the Datanode to read striped block + during background reconstruction work. </description> </property> <property> - <name>dfs.datanode.stripedread.buffer.size</name> + <name>dfs.datanode.ec.reconstruction.stripedread.buffer.size</name> <value>65536</value> <description>Datanode striped read buffer size. </description> </property> <property> + <name>dfs.datanode.ec.reconstruction.stripedblock.threads.size</name> + <value>8</value> + <description> + Number of threads used by the Datanode for background + reconstruction work. + </description> +</property> + +<property> <name>dfs.namenode.quota.init-threads</name> <value>4</value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ae543fd/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java new file mode 100644 index 0000000..97edaf1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestReconstructStripedFile { + public static final Log LOG = LogFactory.getLog(TestReconstructStripedFile.class); + + private static final int dataBlkNum = StripedFileTestUtil.NUM_DATA_BLOCKS; + private static final int parityBlkNum = StripedFileTestUtil.NUM_PARITY_BLOCKS; + private static final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; + private static final int blockSize = cellSize * 3; + private static final int groupSize = dataBlkNum + parityBlkNum; + private static final int dnNum = groupSize + parityBlkNum; + + static { + GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL); + GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL); + GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL); + } + + enum ReconstructionType { + DataOnly, + ParityOnly, + Any + } + + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + // Map: DatanodeID -> datanode index in cluster + private Map<DatanodeID, Integer> dnMap = new HashMap<>(); + private final Random random = new Random(); + + @Before + public void setup() throws IOException { + final Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY, + cellSize - 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, + false); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dnNum).build(); + cluster.waitActive(); + + fs = cluster.getFileSystem(); + fs.getClient().setErasureCodingPolicy("/", null); + + List<DataNode> datanodes = cluster.getDataNodes(); + for (int i = 0; i < dnNum; i++) { + dnMap.put(datanodes.get(i).getDatanodeId(), i); + } + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Test(timeout = 120000) + public void testRecoverOneParityBlock() throws Exception { + int fileLen = 10 * blockSize + blockSize/10; + assertFileBlocksReconstruction("/testRecoverOneParityBlock", fileLen, + ReconstructionType.ParityOnly, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneParityBlock1() throws Exception { + int fileLen = cellSize + cellSize/10; + assertFileBlocksReconstruction("/testRecoverOneParityBlock1", fileLen, + ReconstructionType.ParityOnly, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneParityBlock2() throws Exception { + int fileLen = 1; + assertFileBlocksReconstruction("/testRecoverOneParityBlock2", fileLen, + ReconstructionType.ParityOnly, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneParityBlock3() throws Exception { + int fileLen = 3 * blockSize + blockSize/10; + assertFileBlocksReconstruction("/testRecoverOneParityBlock3", fileLen, + ReconstructionType.ParityOnly, 1); + } + + @Test(timeout = 120000) + public void testRecoverThreeParityBlocks() throws Exception { + int fileLen = 10 * blockSize + blockSize/10; + assertFileBlocksReconstruction("/testRecoverThreeParityBlocks", fileLen, + ReconstructionType.ParityOnly, 3); + } + + @Test(timeout = 120000) + public void testRecoverThreeDataBlocks() throws Exception { + int fileLen = 10 * blockSize + blockSize/10; + assertFileBlocksReconstruction("/testRecoverThreeDataBlocks", fileLen, + ReconstructionType.DataOnly, 3); + } + + @Test(timeout = 120000) + public void testRecoverThreeDataBlocks1() throws Exception { + int fileLen = 3 * blockSize + blockSize/10; + assertFileBlocksReconstruction("/testRecoverThreeDataBlocks1", fileLen, + ReconstructionType.DataOnly, 3); + } + + @Test(timeout = 120000) + public void testRecoverOneDataBlock() throws Exception { + int fileLen = 10 * blockSize + blockSize/10; + assertFileBlocksReconstruction("/testRecoverOneDataBlock", fileLen, + ReconstructionType.DataOnly, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneDataBlock1() throws Exception { + int fileLen = cellSize + cellSize/10; + assertFileBlocksReconstruction("/testRecoverOneDataBlock1", fileLen, + ReconstructionType.DataOnly, 1); + } + + @Test(timeout = 120000) + public void testRecoverOneDataBlock2() throws Exception { + int fileLen = 1; + assertFileBlocksReconstruction("/testRecoverOneDataBlock2", fileLen, + ReconstructionType.DataOnly, 1); + } + + @Test(timeout = 120000) + public void testRecoverAnyBlocks() throws Exception { + int fileLen = 3 * blockSize + blockSize/10; + assertFileBlocksReconstruction("/testRecoverAnyBlocks", fileLen, + ReconstructionType.Any, 2); + } + + @Test(timeout = 120000) + public void testRecoverAnyBlocks1() throws Exception { + int fileLen = 10 * blockSize + blockSize/10; + assertFileBlocksReconstruction("/testRecoverAnyBlocks1", fileLen, + ReconstructionType.Any, 3); + } + + private int[] generateDeadDnIndices(ReconstructionType type, int deadNum, + byte[] indices) { + List<Integer> deadList = new ArrayList<>(deadNum); + while (deadList.size() < deadNum) { + int dead = random.nextInt(indices.length); + boolean isOfType = true; + if (type == ReconstructionType.DataOnly) { + isOfType = indices[dead] < dataBlkNum; + } else if (type == ReconstructionType.ParityOnly) { + isOfType = indices[dead] >= dataBlkNum; + } + if (isOfType && !deadList.contains(dead)) { + deadList.add(dead); + } + } + int[] d = new int[deadNum]; + for (int i = 0; i < deadNum; i++) { + d[i] = deadList.get(i); + } + return d; + } + + private void shutdownDataNodes(DataNode dn) throws IOException { + /* + * Kill the datanode which contains one replica + * We need to make sure it dead in namenode: clear its update time and + * trigger NN to check heartbeat. + */ + dn.shutdown(); + cluster.setDataNodeDead(dn.getDatanodeId()); + } + + private int generateErrors(Map<ExtendedBlock, DataNode> corruptTargets, + ReconstructionType type) + throws IOException { + int stoppedDN = 0; + for (Map.Entry<ExtendedBlock, DataNode> target : corruptTargets.entrySet()) { + if (stoppedDN == 0 || type != ReconstructionType.DataOnly + || random.nextBoolean()) { + // stop at least one DN to trigger reconstruction + LOG.info("Note: stop DataNode " + target.getValue().getDisplayName() + + " with internal block " + target.getKey()); + shutdownDataNodes(target.getValue()); + stoppedDN++; + } else { // corrupt the data on the DN + LOG.info("Note: corrupt data on " + target.getValue().getDisplayName() + + " with internal block " + target.getKey()); + cluster.corruptReplica(target.getValue(), target.getKey()); + } + } + return stoppedDN; + } + + /** + * Test the file blocks reconstruction. + * 1. Check the replica is reconstructed in the target datanode, + * and verify the block replica length, generationStamp and content. + * 2. Read the file and verify content. + */ + private void assertFileBlocksReconstruction(String fileName, int fileLen, + ReconstructionType type, int toRecoverBlockNum) throws Exception { + if (toRecoverBlockNum < 1 || toRecoverBlockNum > parityBlkNum) { + Assert.fail("toRecoverBlockNum should be between 1 ~ " + parityBlkNum); + } + + Path file = new Path(fileName); + + final byte[] data = new byte[fileLen]; + Arrays.fill(data, (byte) 1); + DFSTestUtil.writeFile(fs, file, data); + StripedFileTestUtil.waitBlockGroupsReported(fs, fileName); + + LocatedBlocks locatedBlocks = getLocatedBlocks(file); + assertEquals(locatedBlocks.getFileLength(), fileLen); + + LocatedStripedBlock lastBlock = + (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + + DatanodeInfo[] storageInfos = lastBlock.getLocations(); + byte[] indices = lastBlock.getBlockIndices(); + + BitSet bitset = new BitSet(dnNum); + for (DatanodeInfo storageInfo : storageInfos) { + bitset.set(dnMap.get(storageInfo)); + } + + int[] dead = generateDeadDnIndices(type, toRecoverBlockNum, indices); + LOG.info("Note: indices == " + Arrays.toString(indices) + + ". Generate errors on datanodes: " + Arrays.toString(dead)); + + DatanodeInfo[] dataDNs = new DatanodeInfo[toRecoverBlockNum]; + int[] deadDnIndices = new int[toRecoverBlockNum]; + ExtendedBlock[] blocks = new ExtendedBlock[toRecoverBlockNum]; + File[] replicas = new File[toRecoverBlockNum]; + File[] metadatas = new File[toRecoverBlockNum]; + byte[][] replicaContents = new byte[toRecoverBlockNum][]; + Map<ExtendedBlock, DataNode> errorMap = new HashMap<>(dead.length); + for (int i = 0; i < toRecoverBlockNum; i++) { + dataDNs[i] = storageInfos[dead[i]]; + deadDnIndices[i] = dnMap.get(dataDNs[i]); + + // Check the block replica file on deadDn before it dead. + blocks[i] = StripedBlockUtil.constructInternalBlock( + lastBlock.getBlock(), cellSize, dataBlkNum, indices[dead[i]]); + errorMap.put(blocks[i], cluster.getDataNodes().get(deadDnIndices[i])); + replicas[i] = cluster.getBlockFile(deadDnIndices[i], blocks[i]); + metadatas[i] = cluster.getBlockMetadataFile(deadDnIndices[i], blocks[i]); + // the block replica on the datanode should be the same as expected + assertEquals(replicas[i].length(), + StripedBlockUtil.getInternalBlockLength( + lastBlock.getBlockSize(), cellSize, dataBlkNum, indices[dead[i]])); + assertTrue(metadatas[i].getName(). + endsWith(blocks[i].getGenerationStamp() + ".meta")); + LOG.info("replica " + i + " locates in file: " + replicas[i]); + replicaContents[i] = DFSTestUtil.readFileAsBytes(replicas[i]); + } + + int cellsNum = (fileLen - 1) / cellSize + 1; + int groupSize = Math.min(cellsNum, dataBlkNum) + parityBlkNum; + + // shutdown datanodes or generate corruption + int stoppedDN = generateErrors(errorMap, type); + + // Check the locatedBlocks of the file again + locatedBlocks = getLocatedBlocks(file); + lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + storageInfos = lastBlock.getLocations(); + assertEquals(storageInfos.length, groupSize - stoppedDN); + + int[] targetDNs = new int[dnNum - groupSize]; + int n = 0; + for (int i = 0; i < dnNum; i++) { + if (!bitset.get(i)) { // not contain replica of the block. + targetDNs[n++] = i; + } + } + + waitForReconstructionFinished(file, groupSize); + + targetDNs = sortTargetsByReplicas(blocks, targetDNs); + + // Check the replica on the new target node. + for (int i = 0; i < toRecoverBlockNum; i++) { + File replicaAfterReconstruction = cluster.getBlockFile(targetDNs[i], blocks[i]); + LOG.info("replica after reconstruction " + replicaAfterReconstruction); + File metadataAfterReconstruction = + cluster.getBlockMetadataFile(targetDNs[i], blocks[i]); + assertEquals(replicaAfterReconstruction.length(), replicas[i].length()); + LOG.info("replica before " + replicas[i]); + assertTrue(metadataAfterReconstruction.getName(). + endsWith(blocks[i].getGenerationStamp() + ".meta")); + byte[] replicaContentAfterReconstruction = + DFSTestUtil.readFileAsBytes(replicaAfterReconstruction); + + Assert.assertArrayEquals(replicaContents[i], replicaContentAfterReconstruction); + } + } + + private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) { + int[] result = new int[blocks.length]; + for (int i = 0; i < blocks.length; i++) { + result[i] = -1; + for (int j = 0; j < targetDNs.length; j++) { + if (targetDNs[j] != -1) { + File replica = cluster.getBlockFile(targetDNs[j], blocks[i]); + if (replica != null) { + result[i] = targetDNs[j]; + targetDNs[j] = -1; + break; + } + } + } + if (result[i] == -1) { + Assert.fail("Failed to reconstruct striped block: " + + blocks[i].getBlockId()); + } + } + return result; + } + + private LocatedBlocks waitForReconstructionFinished(Path file, int groupSize) + throws Exception { + final int ATTEMPTS = 60; + for (int i = 0; i < ATTEMPTS; i++) { + LocatedBlocks locatedBlocks = getLocatedBlocks(file); + LocatedStripedBlock lastBlock = + (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + DatanodeInfo[] storageInfos = lastBlock.getLocations(); + if (storageInfos.length >= groupSize) { + return locatedBlocks; + } + Thread.sleep(1000); + } + throw new IOException ("Time out waiting for EC block reconstruction."); + } + + private LocatedBlocks getLocatedBlocks(Path file) throws IOException { + return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE); + } + + /* + * Tests that processErasureCodingTasks should not throw exceptions out due to + * invalid ECTask submission. + */ + @Test + public void testProcessErasureCodingTasksSubmitionShouldSucceed() + throws Exception { + DataNode dataNode = cluster.dataNodes.get(0).datanode; + + // Pack invalid(dummy) parameters in ecTasks. Irrespective of parameters, each task + // thread pool submission should succeed, so that it will not prevent + // processing other tasks in the list if any exceptions. + int size = cluster.dataNodes.size(); + byte[] liveIndices = new byte[size]; + DatanodeInfo[] dataDNs = new DatanodeInfo[size + 1]; + DatanodeStorageInfo targetDnInfos_1 = BlockManagerTestUtil + .newDatanodeStorageInfo(DFSTestUtil.getLocalDatanodeDescriptor(), + new DatanodeStorage("s01")); + DatanodeStorageInfo[] dnStorageInfo = new DatanodeStorageInfo[] { + targetDnInfos_1 }; + + BlockECReconstructionInfo invalidECInfo = new BlockECReconstructionInfo( + new ExtendedBlock("bp-id", 123456), dataDNs, dnStorageInfo, liveIndices, + ErasureCodingPolicyManager.getSystemDefaultPolicy()); + List<BlockECReconstructionInfo> ecTasks = new ArrayList<>(); + ecTasks.add(invalidECInfo); + dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks); + } +}
