HDFS-13165: [SPS]: Collects successfully moved block details via IBR. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8bc72521 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8bc72521 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8bc72521 Branch: refs/heads/HDFS-10285 Commit: 8bc72521fbfe2c711cc7a125079a7c3dc4f2ddf1 Parents: 6fed235 Author: Rakesh Radhakrishnan <rake...@apache.org> Authored: Sun Apr 29 11:06:59 2018 +0530 Committer: Rakesh Radhakrishnan <rake...@apache.org> Committed: Thu Jul 19 22:50:05 2018 +0530 ---------------------------------------------------------------------- .../DatanodeProtocolClientSideTranslatorPB.java | 11 +- .../DatanodeProtocolServerSideTranslatorPB.java | 4 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 25 --- .../server/blockmanagement/BlockManager.java | 86 +++++++++- .../sps/BlockMovementAttemptFinished.java | 24 ++- .../common/sps/BlockStorageMovementTracker.java | 109 +----------- .../sps/BlocksMovementsStatusHandler.java | 70 +------- .../hdfs/server/datanode/BPServiceActor.java | 14 +- .../hadoop/hdfs/server/datanode/DataNode.java | 7 +- .../datanode/StoragePolicySatisfyWorker.java | 48 ++---- .../namenode/FSDirSatisfyStoragePolicyOp.java | 13 +- .../hdfs/server/namenode/FSDirXAttrOp.java | 8 +- .../hdfs/server/namenode/FSDirectory.java | 5 +- .../hdfs/server/namenode/FSNamesystem.java | 30 ++-- .../hadoop/hdfs/server/namenode/NameNode.java | 19 ++- .../hdfs/server/namenode/NameNodeRpcServer.java | 46 +++-- .../sps/BlockStorageMovementAttemptedItems.java | 167 +++++++++++++------ .../hdfs/server/namenode/sps/SPSService.java | 19 ++- .../namenode/sps/StoragePolicySatisfier.java | 154 +++++++++++------ .../hdfs/server/protocol/DatanodeProtocol.java | 4 +- .../sps/ExternalSPSBlockMoveTaskHandler.java | 32 ++-- .../sps/ExternalStoragePolicySatisfier.java | 3 +- .../src/main/proto/DatanodeProtocol.proto | 9 - .../src/main/resources/hdfs-default.xml | 41 +++++ .../TestNameNodePrunesMissingStorages.java | 4 +- .../datanode/InternalDataNodeTestUtils.java | 4 +- .../SimpleBlocksMovementsStatusHandler.java | 88 ++++++++++ .../server/datanode/TestBPOfferService.java | 12 +- .../hdfs/server/datanode/TestBlockRecovery.java | 4 +- .../server/datanode/TestDataNodeLifeline.java | 7 +- .../TestDatanodeProtocolRetryPolicy.java | 4 +- .../server/datanode/TestFsDatasetCache.java | 4 +- .../TestStoragePolicySatisfyWorker.java | 76 +-------- .../hdfs/server/datanode/TestStorageReport.java | 4 +- .../server/namenode/NNThroughputBenchmark.java | 9 +- .../hdfs/server/namenode/NameNodeAdapter.java | 4 +- .../hdfs/server/namenode/TestDeadDatanode.java | 5 +- .../namenode/TestNameNodeReconfigure.java | 17 +- .../TestBlockStorageMovementAttemptedItems.java | 88 ++++++---- .../sps/TestStoragePolicySatisfier.java | 73 ++++++-- ...stStoragePolicySatisfierWithStripedFile.java | 40 +++-- .../sps/TestExternalStoragePolicySatisfier.java | 44 ++--- 42 files changed, 776 insertions(+), 659 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index dcc0705..e4125dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlock import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.VersionRequestProto; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -139,8 +138,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks, - BlocksStorageMoveAttemptFinished storageMovementFinishedBlks) + @Nonnull SlowDiskReports slowDisks) throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) @@ -165,13 +163,6 @@ public class DatanodeProtocolClientSideTranslatorPB implements builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks)); } - // Adding blocks movement results to the heart beat request. - if (storageMovementFinishedBlks != null - && storageMovementFinishedBlks.getBlocks() != null) { - builder.setStorageMoveAttemptFinishedBlks( - PBHelper.convertBlksMovReport(storageMovementFinishedBlks)); - } - HeartbeatResponseProto resp; try { resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index b5bb80a..5cba284 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -122,9 +122,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements request.getXceiverCount(), request.getFailedVolumes(), volumeFailureSummary, request.getRequestFullBlockReportLease(), PBHelper.convertSlowPeerInfo(request.getSlowPeersList()), - PBHelper.convertSlowDiskInfo(request.getSlowDisksList()), - PBHelper.convertBlksMovReport( - request.getStorageMoveAttemptFinishedBlks())); + PBHelper.convertSlowDiskInfo(request.getSlowDisksList())); } catch (IOException e) { throw new ServiceException(e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 38f72c0..f51f839 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerRepo import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockStorageMovementCommandProto; -import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlocksStorageMoveAttemptFinishedProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto; @@ -105,7 +104,6 @@ import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; @@ -971,29 +969,6 @@ public class PBHelper { return SlowDiskReports.create(slowDisksMap); } - public static BlocksStorageMoveAttemptFinished convertBlksMovReport( - BlocksStorageMoveAttemptFinishedProto proto) { - - List<BlockProto> blocksList = proto.getBlocksList(); - Block[] blocks = new Block[blocksList.size()]; - for (int i = 0; i < blocksList.size(); i++) { - BlockProto blkProto = blocksList.get(i); - blocks[i] = PBHelperClient.convert(blkProto); - } - return new BlocksStorageMoveAttemptFinished(blocks); - } - - public static BlocksStorageMoveAttemptFinishedProto convertBlksMovReport( - BlocksStorageMoveAttemptFinished blocksMoveAttemptFinished) { - BlocksStorageMoveAttemptFinishedProto.Builder builder = - BlocksStorageMoveAttemptFinishedProto.newBuilder(); - Block[] blocks = blocksMoveAttemptFinished.getBlocks(); - for (Block block : blocks) { - builder.addBlocks(PBHelperClient.convert(block)); - } - return builder.build(); - } - public static JournalInfo convert(JournalInfoProto info) { int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7e0c943..caf250f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; @@ -92,6 +93,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; +import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; @@ -427,8 +429,11 @@ public class BlockManager implements BlockStatsMXBean { private final BlockIdManager blockIdManager; - /** For satisfying block storage policies. */ - private final StoragePolicySatisfyManager spsManager; + /** + * For satisfying block storage policies. Instantiates if sps is enabled + * internally or externally. + */ + private StoragePolicySatisfyManager spsManager; /** Minimum live replicas needed for the datanode to be transitioned * from ENTERING_MAINTENANCE to IN_MAINTENANCE. @@ -469,8 +474,7 @@ public class BlockManager implements BlockStatsMXBean { DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); - // sps manager manages the user invoked sps paths and does the movement. - spsManager = new StoragePolicySatisfyManager(conf, namesystem, this); + createSPSManager(conf); blockTokenSecretManager = createBlockTokenSecretManager(conf); @@ -699,7 +703,9 @@ public class BlockManager implements BlockStatsMXBean { } public void close() { - getSPSManager().stop(); + if (getSPSManager() != null) { + getSPSManager().stop(); + } bmSafeMode.close(); try { redundancyThread.interrupt(); @@ -713,7 +719,9 @@ public class BlockManager implements BlockStatsMXBean { datanodeManager.close(); pendingReconstruction.stop(); blocksMap.close(); - getSPSManager().stopGracefully(); + if (getSPSManager() != null) { + getSPSManager().stopGracefully(); + } } /** @return the datanodeManager */ @@ -3881,6 +3889,21 @@ public class BlockManager implements BlockStatsMXBean { } processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); + + // notify SPS about the reported block + notifyStorageMovementAttemptFinishedBlk(storageInfo, block); + } + + private void notifyStorageMovementAttemptFinishedBlk( + DatanodeStorageInfo storageInfo, Block block) { + if (getSPSManager() != null) { + SPSService<Long> sps = getSPSManager().getInternalSPSService(); + if (sps.isRunning()) { + sps.notifyStorageMovementAttemptFinishedBlk( + storageInfo.getDatanodeDescriptor(), storageInfo.getStorageType(), + block); + } + } } private void processAndHandleReportedBlock( @@ -5018,6 +5041,57 @@ public class BlockManager implements BlockStatsMXBean { } /** + * Create SPS manager instance. It manages the user invoked sps paths and does + * the movement. + * + * @param conf + * configuration + * @return true if the instance is successfully created, false otherwise. + */ + private boolean createSPSManager(final Configuration conf) { + return createSPSManager(conf, null); + } + + /** + * Create SPS manager instance. It manages the user invoked sps paths and does + * the movement. + * + * @param conf + * configuration + * @param spsMode + * satisfier mode + * @return true if the instance is successfully created, false otherwise. + */ + public boolean createSPSManager(final Configuration conf, + final String spsMode) { + // sps manager manages the user invoked sps paths and does the movement. + // StoragePolicySatisfier(SPS) configs + boolean storagePolicyEnabled = conf.getBoolean( + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DEFAULT); + String modeVal = spsMode; + if (org.apache.commons.lang.StringUtils.isBlank(modeVal)) { + modeVal = conf.get(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_DEFAULT); + } + StoragePolicySatisfierMode mode = StoragePolicySatisfierMode + .fromString(modeVal); + if (!storagePolicyEnabled || mode == StoragePolicySatisfierMode.NONE) { + LOG.info("Storage policy satisfier is disabled"); + return false; + } + spsManager = new StoragePolicySatisfyManager(conf, namesystem, this); + return true; + } + + /** + * Nullify SPS manager as this feature is disabled fully. + */ + public void disableSPS() { + spsManager = null; + } + + /** * @return sps manager. */ public StoragePolicySatisfyManager getSPSManager() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java index 419d806..29c5e9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockMovementAttemptFinished.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.common.sps; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -33,6 +34,7 @@ public class BlockMovementAttemptFinished { private final Block block; private final DatanodeInfo src; private final DatanodeInfo target; + private final StorageType targetType; private final BlockMovementStatus status; /** @@ -44,14 +46,17 @@ public class BlockMovementAttemptFinished { * src datanode * @param target * target datanode + * @param targetType + * target storage type * @param status * movement status */ public BlockMovementAttemptFinished(Block block, DatanodeInfo src, - DatanodeInfo target, BlockMovementStatus status) { + DatanodeInfo target, StorageType targetType, BlockMovementStatus status) { this.block = block; this.src = src; this.target = target; + this.targetType = targetType; this.status = status; } @@ -64,6 +69,20 @@ public class BlockMovementAttemptFinished { } /** + * @return the target datanode where it moved the block. + */ + public DatanodeInfo getTargetDatanode() { + return target; + } + + /** + * @return target storage type. + */ + public StorageType getTargetType() { + return targetType; + } + + /** * @return block movement status code. */ public BlockMovementStatus getStatus() { @@ -74,7 +93,8 @@ public class BlockMovementAttemptFinished { public String toString() { return new StringBuilder().append("Block movement attempt finished(\n ") .append(" block : ").append(block).append(" src node: ").append(src) - .append(" target node: ").append(target).append(" movement status: ") + .append(" target node: ").append(target).append(" target type: ") + .append(targetType).append(" movement status: ") .append(status).append(")").toString(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java index b20d6cf..4ee415e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlockStorageMovementTracker.java @@ -17,17 +17,12 @@ */ package org.apache.hadoop.hdfs.server.common.sps; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.protocol.Block; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,13 +34,10 @@ import org.slf4j.LoggerFactory; public class BlockStorageMovementTracker implements Runnable { private static final Logger LOG = LoggerFactory .getLogger(BlockStorageMovementTracker.class); - private final CompletionService<BlockMovementAttemptFinished> moverCompletionService; + private final CompletionService<BlockMovementAttemptFinished> + moverCompletionService; private final BlocksMovementsStatusHandler blksMovementsStatusHandler; - // Keeps the information - block vs its list of future move tasks - private final Map<Block, List<Future<BlockMovementAttemptFinished>>> moverTaskFutures; - private final Map<Block, List<BlockMovementAttemptFinished>> movementResults; - private volatile boolean running = true; /** @@ -60,53 +52,21 @@ public class BlockStorageMovementTracker implements Runnable { CompletionService<BlockMovementAttemptFinished> moverCompletionService, BlocksMovementsStatusHandler handler) { this.moverCompletionService = moverCompletionService; - this.moverTaskFutures = new HashMap<>(); this.blksMovementsStatusHandler = handler; - this.movementResults = new HashMap<>(); } @Override public void run() { while (running) { - if (moverTaskFutures.size() <= 0) { - try { - synchronized (moverTaskFutures) { - // Waiting for mover tasks. - moverTaskFutures.wait(2000); - } - } catch (InterruptedException ignore) { - // Sets interrupt flag of this thread. - Thread.currentThread().interrupt(); - } - } try { - Future<BlockMovementAttemptFinished> future = - moverCompletionService.take(); + Future<BlockMovementAttemptFinished> future = moverCompletionService + .take(); if (future != null) { BlockMovementAttemptFinished result = future.get(); LOG.debug("Completed block movement. {}", result); - Block block = result.getBlock(); - List<Future<BlockMovementAttemptFinished>> blocksMoving = - moverTaskFutures.get(block); - if (blocksMoving == null) { - LOG.warn("Future task doesn't exist for block : {} ", block); - continue; - } - blocksMoving.remove(future); - - List<BlockMovementAttemptFinished> resultPerTrackIdList = - addMovementResultToBlockIdList(result); - - // Completed all the scheduled blocks movement under this 'trackId'. - if (blocksMoving.isEmpty() || moverTaskFutures.get(block) == null) { - synchronized (moverTaskFutures) { - moverTaskFutures.remove(block); - } - if (running) { - // handle completed or inprogress blocks movements per trackId. - blksMovementsStatusHandler.handle(resultPerTrackIdList); - } - movementResults.remove(block); + if (running && blksMovementsStatusHandler != null) { + // handle completed block movement. + blksMovementsStatusHandler.handle(result); } } } catch (InterruptedException e) { @@ -122,63 +82,10 @@ public class BlockStorageMovementTracker implements Runnable { } } - private List<BlockMovementAttemptFinished> addMovementResultToBlockIdList( - BlockMovementAttemptFinished result) { - Block block = result.getBlock(); - List<BlockMovementAttemptFinished> perBlockIdList; - synchronized (movementResults) { - perBlockIdList = movementResults.get(block); - if (perBlockIdList == null) { - perBlockIdList = new ArrayList<>(); - movementResults.put(block, perBlockIdList); - } - perBlockIdList.add(result); - } - return perBlockIdList; - } - - /** - * Add future task to the tracking list to check the completion status of the - * block movement. - * - * @param blockID - * block identifier - * @param futureTask - * future task used for moving the respective block - */ - public void addBlock(Block block, - Future<BlockMovementAttemptFinished> futureTask) { - synchronized (moverTaskFutures) { - List<Future<BlockMovementAttemptFinished>> futures = - moverTaskFutures.get(block); - // null for the first task - if (futures == null) { - futures = new ArrayList<>(); - moverTaskFutures.put(block, futures); - } - futures.add(futureTask); - // Notify waiting tracker thread about the newly added tasks. - moverTaskFutures.notify(); - } - } - - /** - * Clear the pending movement and movement result queues. - */ - public void removeAll() { - synchronized (moverTaskFutures) { - moverTaskFutures.clear(); - } - synchronized (movementResults) { - movementResults.clear(); - } - } - /** - * Sets running flag to false and clear the pending movement result queues. + * Sets running flag to false. */ public void stopTracking() { running = false; - removeAll(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java index f9f3954..ab67424 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/sps/BlocksMovementsStatusHandler.java @@ -18,78 +18,22 @@ package org.apache.hadoop.hdfs.server.common.sps; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.protocol.Block; /** - * Blocks movements status handler, which is used to collect details of the - * completed block movements and later these attempted finished(with success or - * failure) blocks can be accessed to notify respective listeners, if any. + * Blocks movements status handler, which can be used to collect details of the + * completed block movements. */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class BlocksMovementsStatusHandler { - private final List<Block> blockIdVsMovementStatus = new ArrayList<>(); - - /** - * Collect all the storage movement attempt finished blocks. Later this will - * be send to namenode via heart beat. - * - * @param moveAttemptFinishedBlks - * set of storage movement attempt finished blocks - */ - public void handle( - List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) { - List<Block> blocks = new ArrayList<>(); - - for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) { - blocks.add(item.getBlock()); - } - // Adding to the tracking report list. Later this can be accessed to know - // the attempted block movements. - synchronized (blockIdVsMovementStatus) { - blockIdVsMovementStatus.addAll(blocks); - } - } +public interface BlocksMovementsStatusHandler { /** - * @return unmodifiable list of storage movement attempt finished blocks. - */ - public List<Block> getMoveAttemptFinishedBlocks() { - List<Block> moveAttemptFinishedBlks = new ArrayList<>(); - // 1. Adding all the completed block ids. - synchronized (blockIdVsMovementStatus) { - if (blockIdVsMovementStatus.size() > 0) { - moveAttemptFinishedBlks = Collections - .unmodifiableList(blockIdVsMovementStatus); - } - } - return moveAttemptFinishedBlks; - } - - /** - * Remove the storage movement attempt finished blocks from the tracking list. + * Collect all the storage movement attempt finished blocks. * - * @param moveAttemptFinishedBlks - * set of storage movement attempt finished blocks - */ - public void remove(List<Block> moveAttemptFinishedBlks) { - if (moveAttemptFinishedBlks != null) { - blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks); - } - } - - /** - * Clear the blockID vs movement status tracking map. + * @param moveAttemptFinishedBlk + * storage movement attempt finished block */ - public void removeAll() { - synchronized (blockIdVsMovementStatus) { - blockIdVsMovementStatus.clear(); - } - } + void handle(BlockMovementAttemptFinished moveAttemptFinishedBlk); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index b7beda4..dab8ae9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -514,12 +514,6 @@ class BPServiceActor implements Runnable { SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) : SlowDiskReports.EMPTY_REPORT; - // Get the blocks storage move attempt finished blocks - List<Block> results = dn.getStoragePolicySatisfyWorker() - .getBlocksMovementsStatusHandler().getMoveAttemptFinishedBlocks(); - BlocksStorageMoveAttemptFinished storageMoveAttemptFinishedBlks = - getStorageMoveAttemptFinishedBlocks(results); - HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), @@ -530,19 +524,13 @@ class BPServiceActor implements Runnable { volumeFailureSummary, requestBlockReportLease, slowPeers, - slowDisks, - storageMoveAttemptFinishedBlks); + slowDisks); if (outliersReportDue) { // If the report was due and successfully sent, schedule the next one. scheduler.scheduleNextOutlierReport(); } - // Remove the blocks movement results after successfully transferring - // to namenode. - dn.getStoragePolicySatisfyWorker().getBlocksMovementsStatusHandler() - .remove(results); - return response; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 196d4c7..565faec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1424,7 +1424,7 @@ public class DataNode extends ReconfigurableBase ecWorker = new ErasureCodingWorker(getConf(), this); blockRecoveryWorker = new BlockRecoveryWorker(this); storagePolicySatisfyWorker = - new StoragePolicySatisfyWorker(getConf(), this); + new StoragePolicySatisfyWorker(getConf(), this, null); storagePolicySatisfyWorker.start(); blockPoolManager = new BlockPoolManager(this); @@ -2134,11 +2134,6 @@ public class DataNode extends ReconfigurableBase notifyAll(); } tracer.close(); - - // Waiting to finish SPS worker thread. - if (storagePolicySatisfyWorker != null) { - storagePolicySatisfyWorker.waitToFinishWorkerThread(); - } } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java index af6137c..0157205 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java @@ -24,7 +24,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -38,19 +37,17 @@ import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher; import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished; import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus; import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker; import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler; -import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; - /** * StoragePolicySatisfyWorker handles the storage policy satisfier commands. * These commands would be issued from NameNode as part of Datanode's heart beat @@ -67,19 +64,19 @@ public class StoragePolicySatisfyWorker { private final int moverThreads; private final ExecutorService moveExecutor; - private final CompletionService<BlockMovementAttemptFinished> moverCompletionService; - private final BlocksMovementsStatusHandler handler; + private final CompletionService<BlockMovementAttemptFinished> + moverCompletionService; private final BlockStorageMovementTracker movementTracker; private Daemon movementTrackerThread; private final BlockDispatcher blkDispatcher; - public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) { + public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode, + BlocksMovementsStatusHandler handler) { this.datanode = datanode; - // Defaulting to 10. This is to minimise the number of move ops. + // Defaulting to 10. This is to minimize the number of move ops. moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 10); moveExecutor = initializeBlockMoverThreadPool(moverThreads); moverCompletionService = new ExecutorCompletionService<>(moveExecutor); - handler = new BlocksMovementsStatusHandler(); movementTracker = new BlockStorageMovementTracker(moverCompletionService, handler); movementTrackerThread = new Daemon(movementTracker); @@ -88,7 +85,6 @@ public class StoragePolicySatisfyWorker { int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf); blkDispatcher = new BlockDispatcher(dnConf.getSocketTimeout(), ioFileBufferSize, dnConf.getConnectToDnViaHostname()); - // TODO: Needs to manage the number of concurrent moves per DataNode. } /** @@ -100,22 +96,17 @@ public class StoragePolicySatisfyWorker { } /** - * Stop StoragePolicySatisfyWorker, which will stop block movement tracker - * thread. + * Stop StoragePolicySatisfyWorker, which will terminate executor service and + * stop block movement tracker thread. */ void stop() { movementTracker.stopTracking(); movementTrackerThread.interrupt(); - } - - /** - * Timed wait to stop BlockStorageMovement tracker daemon thread. - */ - void waitToFinishWorkerThread() { + moveExecutor.shutdown(); try { - movementTrackerThread.join(3000); - } catch (InterruptedException ignore) { - // ignore + moveExecutor.awaitTermination(500, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for mover thread to terminate", e); } } @@ -160,10 +151,7 @@ public class StoragePolicySatisfyWorker { : "Source and Target storage type shouldn't be same!"; BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID, blkMovingInfo); - Future<BlockMovementAttemptFinished> moveCallable = moverCompletionService - .submit(blockMovingTask); - movementTracker.addBlock(blkMovingInfo.getBlock(), - moveCallable); + moverCompletionService.submit(blockMovingTask); } } @@ -185,7 +173,8 @@ public class StoragePolicySatisfyWorker { public BlockMovementAttemptFinished call() { BlockMovementStatus status = moveBlock(); return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(), - blkMovingInfo.getSource(), blkMovingInfo.getTarget(), status); + blkMovingInfo.getSource(), blkMovingInfo.getTarget(), + blkMovingInfo.getTargetStorageType(), status); } private BlockMovementStatus moveBlock() { @@ -217,11 +206,6 @@ public class StoragePolicySatisfyWorker { } } - @VisibleForTesting - BlocksMovementsStatusHandler getBlocksMovementsStatusHandler() { - return handler; - } - /** * Drop the in-progress SPS work queues. */ @@ -229,7 +213,5 @@ public class StoragePolicySatisfyWorker { LOG.info("Received request to drop StoragePolicySatisfierWorker queues. " + "So, none of the SPS Worker queued block movements will" + " be scheduled."); - movementTracker.removeAll(); - handler.removeAll(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java index 45d6218..3f873d7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirSatisfyStoragePolicyOp.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager; import com.google.common.collect.Lists; @@ -102,7 +103,11 @@ final class FSDirSatisfyStoragePolicyOp { // Adding directory in the pending queue, so FileInodeIdCollector // process directory child in batch and recursively - fsd.getBlockManager().getSPSManager().addPathId(inode.getId()); + StoragePolicySatisfyManager spsManager = + fsd.getBlockManager().getSPSManager(); + if (spsManager != null) { + spsManager.addPathId(inode.getId()); + } } } finally { fsd.writeUnlock(); @@ -116,7 +121,11 @@ final class FSDirSatisfyStoragePolicyOp { } else { // Adding directory in the pending queue, so FileInodeIdCollector process // directory child in batch and recursively - fsd.getBlockManager().getSPSManager().addPathId(inode.getId()); + StoragePolicySatisfyManager spsManager = + fsd.getBlockManager().getSPSManager(); + if (spsManager != null) { + spsManager.addPathId(inode.getId()); + } return true; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java index 1150a72..3b68979 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ReencryptionInfoProto; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager; import org.apache.hadoop.security.AccessControlException; import java.io.FileNotFoundException; @@ -209,8 +210,11 @@ class FSDirXAttrOp { for (XAttr xattr : toRemove) { if (XATTR_SATISFY_STORAGE_POLICY .equals(XAttrHelper.getPrefixedName(xattr))) { - fsd.getBlockManager().getSPSManager().getInternalSPSService() - .clearQueue(inode.getId()); + StoragePolicySatisfyManager spsManager = + fsd.getBlockManager().getSPSManager(); + if (spsManager != null) { + spsManager.getInternalSPSService().clearQueue(inode.getId()); + } break; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 6539b51..2a976d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo.UpdatedReplicationInfo; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager; import org.apache.hadoop.hdfs.util.ByteArray; import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.ReadOnlyList; @@ -1401,7 +1402,9 @@ public class FSDirectory implements Closeable { if (!inode.isSymlink()) { final XAttrFeature xaf = inode.getXAttrFeature(); addEncryptionZone((INodeWithAdditionalFields) inode, xaf); - if (namesystem.getBlockManager().getSPSManager().isEnabled()) { + StoragePolicySatisfyManager spsManager = + namesystem.getBlockManager().getSPSManager(); + if (spsManager != null && spsManager.isEnabled()) { addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 9c44079..0e33234 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -259,7 +259,6 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.snapshot.DirectorySnapshottableFeature; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotManager; -import org.apache.hadoop.hdfs.server.namenode.sps.SPSService; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase; import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress; import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step; @@ -268,7 +267,6 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger; import org.apache.hadoop.hdfs.server.namenode.top.TopConf; import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics; import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -1288,7 +1286,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir, edekCacheLoaderDelay, edekCacheLoaderInterval); } - blockManager.getSPSManager().start(); + if (blockManager.getSPSManager() != null) { + blockManager.getSPSManager().start(); + } } finally { startingActiveService = false; blockManager.checkSafeMode(); @@ -1318,7 +1318,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, LOG.info("Stopping services started for active state"); writeLock(); try { - if (blockManager != null) { + if (blockManager != null && blockManager.getSPSManager() != null) { blockManager.getSPSManager().stop(); } stopSecretManager(); @@ -1359,7 +1359,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // Don't want to keep replication queues when not in Active. blockManager.clearQueues(); blockManager.setInitializedReplQueues(false); - blockManager.getSPSManager().stopGracefully(); + if (blockManager.getSPSManager() != null) { + blockManager.getSPSManager().stopGracefully(); + } } } finally { writeUnlock("stopActiveServices"); @@ -2268,7 +2270,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, DFS_STORAGE_POLICY_ENABLED_KEY)); } // checks sps status - if (!blockManager.getSPSManager().isEnabled() || (blockManager + boolean disabled = (blockManager.getSPSManager() == null); + if (disabled || (blockManager .getSPSManager().getMode() == StoragePolicySatisfierMode.INTERNAL && !blockManager.getSPSManager().isInternalSatisfierRunning())) { throw new UnsupportedActionException( @@ -3966,8 +3969,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks, - BlocksStorageMoveAttemptFinished blksMovementsFinished) + @Nonnull SlowDiskReports slowDisks) throws IOException { readLock(); try { @@ -3983,18 +3985,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg); } - // Handle blocks movement results sent by the coordinator datanode. - SPSService sps = blockManager.getSPSManager().getInternalSPSService(); - if (!sps.isRunning()) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Storage policy satisfier is not running. So, ignoring storage" - + " movement attempt finished block info sent by DN"); - } - } else { - sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished); - } - //create ha status final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat( haContext.getState().getServiceState(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index b199c72..2087d90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -2148,7 +2148,24 @@ public class NameNode extends ReconfigurableBase implements } StoragePolicySatisfierMode mode = StoragePolicySatisfierMode .fromString(newVal); - namesystem.getBlockManager().getSPSManager().changeModeEvent(mode); + if (mode == StoragePolicySatisfierMode.NONE) { + // disabling sps service + if (namesystem.getBlockManager().getSPSManager() != null) { + namesystem.getBlockManager().getSPSManager().changeModeEvent(mode); + namesystem.getBlockManager().disableSPS(); + } + } else { + // enabling sps service + boolean spsCreated = (namesystem.getBlockManager() + .getSPSManager() != null); + if (!spsCreated) { + spsCreated = namesystem.getBlockManager().createSPSManager(getConf(), + newVal); + } + if (spsCreated) { + namesystem.getBlockManager().getSPSManager().changeModeEvent(mode); + } + } return newVal; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 1590423..57e827d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -156,8 +156,8 @@ import org.apache.hadoop.hdfs.server.common.HttpGetFailedException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfyManager; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -1517,16 +1517,14 @@ public class NameNodeRpcServer implements NamenodeProtocols { int failedVolumes, VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks, - BlocksStorageMoveAttemptFinished storageMovementFinishedBlks) + @Nonnull SlowDiskReports slowDisks) throws IOException { checkNNStartup(); verifyRequest(nodeReg); return namesystem.handleHeartbeat(nodeReg, report, dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress, failedVolumes, volumeFailureSummary, requestFullBlockReportLease, - slowPeers, slowDisks, - storageMovementFinishedBlks); + slowPeers, slowDisks); } @Override // DatanodeProtocol @@ -2543,10 +2541,12 @@ public class NameNodeRpcServer implements NamenodeProtocols { if (nn.isStandbyState()) { throw new StandbyException("Not supported by Standby Namenode."); } - boolean isSPSRunning = namesystem.getBlockManager().getSPSManager() - .isInternalSatisfierRunning(); + StoragePolicySatisfyManager spsMgr = + namesystem.getBlockManager().getSPSManager(); + boolean isInternalSatisfierRunning = (spsMgr != null + ? spsMgr.isInternalSatisfierRunning() : false); namesystem.logAuditEvent(true, operationName, null); - return isSPSRunning; + return isInternalSatisfierRunning; } @Override @@ -2556,6 +2556,14 @@ public class NameNodeRpcServer implements NamenodeProtocols { if (nn.isStandbyState()) { throw new StandbyException("Not supported by Standby Namenode."); } + if (namesystem.getBlockManager().getSPSManager() == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Satisfier is not running inside namenode, so status " + + "can't be returned."); + } + throw new IOException("Satisfier is not running inside namenode, " + + "so status can't be returned."); + } return namesystem.getBlockManager().getSPSManager() .checkStoragePolicySatisfyPathStatus(path); } @@ -2568,16 +2576,20 @@ public class NameNodeRpcServer implements NamenodeProtocols { if (nn.isStandbyState()) { throw new StandbyException("Not supported by Standby Namenode."); } - // Check that SPS daemon service is running inside namenode - if (namesystem.getBlockManager().getSPSManager() - .getMode() == StoragePolicySatisfierMode.INTERNAL) { - LOG.debug("SPS service is internally enabled and running inside " - + "namenode, so external SPS is not allowed to fetch the path Ids"); - throw new IOException("SPS service is internally enabled and running" - + " inside namenode, so external SPS is not allowed to fetch" - + " the path Ids"); + // Check that SPS is enabled externally + StoragePolicySatisfyManager spsMgr = + namesystem.getBlockManager().getSPSManager(); + StoragePolicySatisfierMode spsMode = (spsMgr != null ? spsMgr.getMode() + : StoragePolicySatisfierMode.NONE); + if (spsMode != StoragePolicySatisfierMode.EXTERNAL) { + if (LOG.isDebugEnabled()) { + LOG.debug("SPS service mode is {}, so external SPS service is " + + "not allowed to fetch the path Ids", spsMode); + } + throw new IOException("SPS service mode is " + spsMode + ", so " + + "external SPS service is not allowed to fetch the path Ids"); } - Long pathId = namesystem.getBlockManager().getSPSManager().getNextPathId(); + Long pathId = spsMgr.getNextPathId(); if (pathId == null) { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java index d2f0bb2..5b25491 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/BlockStorageMovementAttemptedItems.java @@ -17,21 +17,28 @@ */ package org.apache.hadoop.hdfs.server.namenode.sps; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY; import static org.apache.hadoop.util.Time.monotonicNow; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY; - +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo; +import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.StorageTypeNodePair; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,10 +67,13 @@ public class BlockStorageMovementAttemptedItems<T> { * processing and sent to DNs. */ private final List<AttemptedItemInfo<T>> storageMovementAttemptedItems; - private final List<Block> movementFinishedBlocks; + private Map<Block, Set<StorageTypeNodePair>> scheduledBlkLocs; + // Maintains separate Queue to keep the movement finished blocks. This Q + // is used to update the storageMovementAttemptedItems list asynchronously. + private final BlockingQueue<Block> movementFinishedBlocks; private volatile boolean monitorRunning = true; private Daemon timerThread = null; - private final BlockMovementListener blkMovementListener; + private BlockMovementListener blkMovementListener; // // It might take anywhere between 5 to 10 minutes before // a request is timed out. @@ -94,7 +104,8 @@ public class BlockStorageMovementAttemptedItems<T> { DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT); this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles; storageMovementAttemptedItems = new ArrayList<>(); - movementFinishedBlocks = new ArrayList<>(); + scheduledBlkLocs = new HashMap<>(); + movementFinishedBlocks = new LinkedBlockingQueue<>(); this.blkMovementListener = blockMovementListener; } @@ -105,29 +116,67 @@ public class BlockStorageMovementAttemptedItems<T> { * @param itemInfo * - tracking info */ - public void add(AttemptedItemInfo<T> itemInfo) { + public void add(T startPath, T file, long monotonicNow, + Map<Block, Set<StorageTypeNodePair>> assignedBlocks, int retryCount) { + AttemptedItemInfo<T> itemInfo = new AttemptedItemInfo<T>(startPath, file, + monotonicNow, assignedBlocks.keySet(), retryCount); synchronized (storageMovementAttemptedItems) { storageMovementAttemptedItems.add(itemInfo); } + synchronized (scheduledBlkLocs) { + scheduledBlkLocs.putAll(assignedBlocks); + } } /** - * Add the storage movement attempt finished blocks to - * storageMovementFinishedBlocks. + * Notify the storage movement attempt finished block. * - * @param moveAttemptFinishedBlks - * storage movement attempt finished blocks + * @param reportedDn + * reported datanode + * @param type + * storage type + * @param reportedBlock + * reported block */ - public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) { - if (moveAttemptFinishedBlks.length == 0) { - return; + public void notifyReportedBlock(DatanodeInfo reportedDn, StorageType type, + Block reportedBlock) { + synchronized (scheduledBlkLocs) { + if (scheduledBlkLocs.size() <= 0) { + return; + } + matchesReportedBlock(reportedDn, type, reportedBlock); } - synchronized (movementFinishedBlocks) { - movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks)); + } + + private void matchesReportedBlock(DatanodeInfo reportedDn, StorageType type, + Block reportedBlock) { + Set<StorageTypeNodePair> blkLocs = scheduledBlkLocs.get(reportedBlock); + if (blkLocs == null) { + return; // unknown block, simply skip. } - // External listener if it is plugged-in - if (blkMovementListener != null) { - blkMovementListener.notifyMovementTriedBlocks(moveAttemptFinishedBlks); + + for (StorageTypeNodePair dn : blkLocs) { + boolean foundDn = dn.getDatanodeInfo().compareTo(reportedDn) == 0 ? true + : false; + boolean foundType = dn.getStorageType().equals(type); + if (foundDn && foundType) { + blkLocs.remove(dn); + // listener if it is plugged-in + if (blkMovementListener != null) { + blkMovementListener + .notifyMovementTriedBlocks(new Block[] {reportedBlock}); + } + // All the block locations has reported. + if (blkLocs.size() <= 0) { + movementFinishedBlocks.add(reportedBlock); + scheduledBlkLocs.remove(reportedBlock); // clean-up reported block + } + return; // found + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Reported block:{} not found in attempted blocks. Datanode:{}" + + ", StorageType:{}", reportedBlock, reportedDn, type); } } @@ -203,14 +252,12 @@ public class BlockStorageMovementAttemptedItems<T> { if (now > itemInfo.getLastAttemptedOrReportedTime() + selfRetryTimeout) { T file = itemInfo.getFile(); - synchronized (movementFinishedBlocks) { - ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(), - file, itemInfo.getRetryCount() + 1); - blockStorageMovementNeeded.add(candidate); - iter.remove(); - LOG.info("TrackID: {} becomes timed out and moved to needed " - + "retries queue for next iteration.", file); - } + ItemInfo<T> candidate = new ItemInfo<T>(itemInfo.getStartPath(), file, + itemInfo.getRetryCount() + 1); + blockStorageMovementNeeded.add(candidate); + iter.remove(); + LOG.info("TrackID: {} becomes timed out and moved to needed " + + "retries queue for next iteration.", file); } } } @@ -218,29 +265,25 @@ public class BlockStorageMovementAttemptedItems<T> { @VisibleForTesting void blockStorageMovementReportedItemsCheck() throws IOException { - synchronized (movementFinishedBlocks) { - Iterator<Block> finishedBlksIter = movementFinishedBlocks.iterator(); - while (finishedBlksIter.hasNext()) { - Block blk = finishedBlksIter.next(); - synchronized (storageMovementAttemptedItems) { - Iterator<AttemptedItemInfo<T>> iterator = - storageMovementAttemptedItems.iterator(); - while (iterator.hasNext()) { - AttemptedItemInfo<T> attemptedItemInfo = iterator.next(); - attemptedItemInfo.getBlocks().remove(blk); - if (attemptedItemInfo.getBlocks().isEmpty()) { - // TODO: try add this at front of the Queue, so that this element - // gets the chance first and can be cleaned from queue quickly as - // all movements already done. - blockStorageMovementNeeded.add(new ItemInfo<T>(attemptedItemInfo - .getStartPath(), attemptedItemInfo.getFile(), - attemptedItemInfo.getRetryCount() + 1)); - iterator.remove(); - } + // Removes all available blocks from this queue and process it. + Collection<Block> finishedBlks = new ArrayList<>(); + movementFinishedBlocks.drainTo(finishedBlks); + + // Update attempted items list + for (Block blk : finishedBlks) { + synchronized (storageMovementAttemptedItems) { + Iterator<AttemptedItemInfo<T>> iterator = storageMovementAttemptedItems + .iterator(); + while (iterator.hasNext()) { + AttemptedItemInfo<T> attemptedItemInfo = iterator.next(); + attemptedItemInfo.getBlocks().remove(blk); + if (attemptedItemInfo.getBlocks().isEmpty()) { + blockStorageMovementNeeded.add(new ItemInfo<T>( + attemptedItemInfo.getStartPath(), attemptedItemInfo.getFile(), + attemptedItemInfo.getRetryCount() + 1)); + iterator.remove(); } } - // Remove attempted blocks from movementFinishedBlocks list. - finishedBlksIter.remove(); } } } @@ -252,15 +295,29 @@ public class BlockStorageMovementAttemptedItems<T> { @VisibleForTesting public int getAttemptedItemsCount() { - return storageMovementAttemptedItems.size(); + synchronized (storageMovementAttemptedItems) { + return storageMovementAttemptedItems.size(); + } } public void clearQueues() { - synchronized (movementFinishedBlocks) { - movementFinishedBlocks.clear(); - } + movementFinishedBlocks.clear(); synchronized (storageMovementAttemptedItems) { storageMovementAttemptedItems.clear(); } + synchronized (scheduledBlkLocs) { + scheduledBlkLocs.clear(); + } + } + + /** + * Sets external listener for testing. + * + * @param blkMoveListener + * block movement listener callback object + */ + @VisibleForTesting + void setBlockMovementListener(BlockMovementListener blkMoveListener) { + this.blkMovementListener = blkMoveListener; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java index 71d8fd1..5032377 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSService.java @@ -22,8 +22,10 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; /** * An interface for SPSService, which exposes life cycle and processing APIs. @@ -131,11 +133,16 @@ public interface SPSService<T> { void markScanCompletedForPath(T spsPath); /** - * Notify the details of storage movement attempt finished blocks. + * Given node is reporting that it received a certain movement attempt + * finished block. * - * @param moveAttemptFinishedBlks - * - array contains all the blocks that are attempted to move + * @param dnInfo + * - reported datanode + * @param storageType + * - storage type + * @param block + * - block that is attempted to move */ - void notifyStorageMovementAttemptFinishedBlks( - BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks); + void notifyStorageMovementAttemptFinishedBlk(DatanodeInfo dnInfo, + StorageType storageType, Block block); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java index 1c7a580..cbd6001 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java @@ -24,9 +24,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience; @@ -50,7 +53,6 @@ import org.apache.hadoop.hdfs.server.balancer.Matcher; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; -import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.StringUtils; @@ -83,8 +85,6 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { private BlockStorageMovementNeeded<T> storageMovementNeeded; private BlockStorageMovementAttemptedItems<T> storageMovementsMonitor; private volatile boolean isRunning = false; - private volatile StoragePolicySatisfierMode spsMode = - StoragePolicySatisfierMode.NONE; private int spsWorkMultiplier; private long blockCount = 0L; private int blockMovementMaxRetry; @@ -128,11 +128,12 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { } private Status status = null; - private List<Block> assignedBlocks = null; + private Map<Block, Set<StorageTypeNodePair>> assignedBlocks = null; - BlocksMovingAnalysis(Status status, List<Block> blockMovingInfo) { + BlocksMovingAnalysis(Status status, + Map<Block, Set<StorageTypeNodePair>> assignedBlocks) { this.status = status; - this.assignedBlocks = blockMovingInfo; + this.assignedBlocks = assignedBlocks; } } @@ -164,7 +165,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { serviceMode); return; } - if (spsMode == StoragePolicySatisfierMode.INTERNAL + if (serviceMode == StoragePolicySatisfierMode.INTERNAL && ctxt.isMoverRunning()) { isRunning = false; LOG.error( @@ -175,14 +176,13 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { } if (reconfigStart) { LOG.info("Starting {} StoragePolicySatisfier, as admin requested to " - + "start it.", StringUtils.toLowerCase(spsMode.toString())); + + "start it.", StringUtils.toLowerCase(serviceMode.toString())); } else { LOG.info("Starting {} StoragePolicySatisfier.", - StringUtils.toLowerCase(spsMode.toString())); + StringUtils.toLowerCase(serviceMode.toString())); } isRunning = true; - this.spsMode = serviceMode; // Ensure that all the previously submitted block movements(if any) have to // be stopped in all datanodes. addDropSPSWorkCommandsToAllDNs(); @@ -297,36 +297,36 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { // be removed on storage movement attempt finished report. case BLOCKS_TARGETS_PAIRED: if (LOG.isDebugEnabled()) { - LOG.debug("Block analysis status:{} for the file path:{}." + LOG.debug("Block analysis status:{} for the file id:{}." + " Adding to attempt monitor queue for the storage " + "movement attempt finished report", - status.status, fileStatus.getPath()); + status.status, fileStatus.getFileId()); } - this.storageMovementsMonitor.add(new AttemptedItemInfo<T>( - itemInfo.getStartPath(), itemInfo.getFile(), monotonicNow(), - status.assignedBlocks, itemInfo.getRetryCount())); + this.storageMovementsMonitor.add(itemInfo.getStartPath(), + itemInfo.getFile(), monotonicNow(), status.assignedBlocks, + itemInfo.getRetryCount()); break; case NO_BLOCKS_TARGETS_PAIRED: if (LOG.isDebugEnabled()) { - LOG.debug("Adding trackID:{} for the file path:{} back to" + LOG.debug("Adding trackID:{} for the file id:{} back to" + " retry queue as none of the blocks found its eligible" - + " targets.", trackId, fileStatus.getPath()); + + " targets.", trackId, fileStatus.getFileId()); } retryItem = true; break; case FEW_LOW_REDUNDANCY_BLOCKS: if (LOG.isDebugEnabled()) { - LOG.debug("Adding trackID:{} for the file path:{} back to " + LOG.debug("Adding trackID:{} for the file id:{} back to " + "retry queue as some of the blocks are low redundant.", - trackId, fileStatus.getPath()); + trackId, fileStatus.getFileId()); } retryItem = true; break; case BLOCKS_FAILED_TO_MOVE: if (LOG.isDebugEnabled()) { - LOG.debug("Adding trackID:{} for the file path:{} back to " + LOG.debug("Adding trackID:{} for the file id:{} back to " + "retry queue as some of the blocks movement failed.", - trackId, fileStatus.getPath()); + trackId, fileStatus.getFileId()); } retryItem = true; break; @@ -334,9 +334,9 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { case BLOCKS_TARGET_PAIRING_SKIPPED: case BLOCKS_ALREADY_SATISFIED: default: - LOG.info("Block analysis status:{} for the file path:{}." + LOG.info("Block analysis status:{} for the file id:{}." + " So, Cleaning up the Xattrs.", status.status, - fileStatus.getPath()); + fileStatus.getFileId()); storageMovementNeeded.removeItemTrackInfo(itemInfo, true); break; } @@ -389,19 +389,19 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { if (!lastBlkComplete) { // Postpone, currently file is under construction LOG.info("File: {} is under construction. So, postpone" - + " this to the next retry iteration", fileInfo.getPath()); + + " this to the next retry iteration", fileInfo.getFileId()); return new BlocksMovingAnalysis( BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY, - new ArrayList<>()); + new HashMap<>()); } List<LocatedBlock> blocks = locatedBlocks.getLocatedBlocks(); if (blocks.size() == 0) { LOG.info("File: {} is not having any blocks." - + " So, skipping the analysis.", fileInfo.getPath()); + + " So, skipping the analysis.", fileInfo.getFileId()); return new BlocksMovingAnalysis( BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED, - new ArrayList<>()); + new HashMap<>()); } List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>(); boolean hasLowRedundancyBlocks = false; @@ -432,7 +432,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { + "So, ignoring to move the blocks"); return new BlocksMovingAnalysis( BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED, - new ArrayList<>()); + new HashMap<>()); } } else { expectedStorageTypes = existingStoragePolicy @@ -465,13 +465,21 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { && status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) { status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS; } - List<Block> assignedBlockIds = new ArrayList<Block>(); + Map<Block, Set<StorageTypeNodePair>> assignedBlocks = new HashMap<>(); for (BlockMovingInfo blkMovingInfo : blockMovingInfos) { // Check for at least one block storage movement has been chosen try { blockMoveTaskHandler.submitMoveTask(blkMovingInfo); LOG.debug("BlockMovingInfo: {}", blkMovingInfo); - assignedBlockIds.add(blkMovingInfo.getBlock()); + StorageTypeNodePair nodeStorage = new StorageTypeNodePair( + blkMovingInfo.getTargetStorageType(), blkMovingInfo.getTarget()); + Set<StorageTypeNodePair> nodesWithStorage = assignedBlocks + .get(blkMovingInfo.getBlock()); + if (nodesWithStorage == null) { + nodesWithStorage = new HashSet<>(); + assignedBlocks.put(blkMovingInfo.getBlock(), nodesWithStorage); + } + nodesWithStorage.add(nodeStorage); blockCount++; } catch (IOException e) { LOG.warn("Exception while scheduling movement task", e); @@ -479,7 +487,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { status = BlocksMovingAnalysis.Status.BLOCKS_FAILED_TO_MOVE; } } - return new BlocksMovingAnalysis(status, assignedBlockIds); + return new BlocksMovingAnalysis(status, assignedBlocks); } /** @@ -545,6 +553,11 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { new ArrayList<StorageTypeNodePair>(); List<DatanodeInfo> existingBlockStorages = new ArrayList<DatanodeInfo>( Arrays.asList(storages)); + + // Add existing storages into exclude nodes to avoid choosing this as + // remote target later. + List<DatanodeInfo> excludeNodes = new ArrayList<>(existingBlockStorages); + // if expected type exists in source node already, local movement would be // possible, so lets find such sources first. Iterator<DatanodeInfo> iterator = existingBlockStorages.iterator(); @@ -582,7 +595,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { foundMatchingTargetNodesForBlock |= findSourceAndTargetToMove( blockMovingInfos, blockInfo, sourceWithStorageMap, expectedStorageTypes, targetDns, - ecPolicy); + ecPolicy, excludeNodes); } return foundMatchingTargetNodesForBlock; } @@ -601,6 +614,10 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { * - Expecting storages to move * @param targetDns * - Available DNs for expected storage types + * @param ecPolicy + * - erasure coding policy of sps invoked file + * @param excludeNodes + * - existing source nodes, which has replica copy * @return false if some of the block locations failed to find target node to * satisfy the storage policy */ @@ -609,9 +626,8 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { List<StorageTypeNodePair> sourceWithStorageList, List<StorageType> expectedTypes, EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> targetDns, - ErasureCodingPolicy ecPolicy) { + ErasureCodingPolicy ecPolicy, List<DatanodeInfo> excludeNodes) { boolean foundMatchingTargetNodesForBlock = true; - List<DatanodeInfo> excludeNodes = new ArrayList<>(); // Looping over all the source node locations and choose the target // storage within same node if possible. This is done separately to @@ -638,10 +654,12 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { expectedTypes.remove(chosenTarget.storageType); } } - // To avoid choosing this excludeNodes as targets later - excludeNodes.add(existingTypeNodePair.dn); } - + // If all the sources and targets are paired within same node, then simply + // return. + if (expectedTypes.size() <= 0) { + return foundMatchingTargetNodesForBlock; + } // Looping over all the source node locations. Choose a remote target // storage node if it was not found out within same node. for (int i = 0; i < sourceWithStorageList.size(); i++) { @@ -824,14 +842,29 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { /** * Keeps datanode with its respective storage type. */ - private static final class StorageTypeNodePair { + static final class StorageTypeNodePair { private final StorageType storageType; private final DatanodeInfo dn; - private StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) { + StorageTypeNodePair(StorageType storageType, DatanodeInfo dn) { this.storageType = storageType; this.dn = dn; } + + public DatanodeInfo getDatanodeInfo() { + return dn; + } + + public StorageType getStorageType() { + return storageType; + } + + @Override + public String toString() { + return new StringBuilder().append("StorageTypeNodePair(\n ") + .append("DatanodeInfo: ").append(dn).append(", StorageType: ") + .append(storageType).toString(); + } } private EnumMap<StorageType, List<DatanodeWithStorage.StorageDetails>> @@ -1043,18 +1076,19 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { } /** - * Receives set of storage movement attempt finished blocks report. + * Receives storage movement attempt finished block report. * - * @param moveAttemptFinishedBlks - * set of storage movement attempt finished blocks. + * @param dnInfo + * reported datanode + * @param storageType + * - storage type + * @param block + * movement attempt finished block. */ - public void notifyStorageMovementAttemptFinishedBlks( - BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) { - if (moveAttemptFinishedBlks.getBlocks().length <= 0) { - return; - } - storageMovementsMonitor - .notifyMovementTriedBlocks(moveAttemptFinishedBlks.getBlocks()); + @Override + public void notifyStorageMovementAttemptFinishedBlk(DatanodeInfo dnInfo, + StorageType storageType, Block block) { + storageMovementsMonitor.notifyReportedBlock(dnInfo, storageType, block); } @VisibleForTesting @@ -1086,7 +1120,7 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { */ final static class AttemptedItemInfo<T> extends ItemInfo<T> { private long lastAttemptedOrReportedTime; - private final List<Block> blocks; + private final Set<Block> blocks; /** * AttemptedItemInfo constructor. @@ -1097,10 +1131,14 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { * trackId for file. * @param lastAttemptedOrReportedTime * last attempted or reported time + * @param blocks + * scheduled blocks + * @param retryCount + * file retry count */ AttemptedItemInfo(T rootId, T trackId, long lastAttemptedOrReportedTime, - List<Block> blocks, int retryCount) { + Set<Block> blocks, int retryCount) { super(rootId, trackId, retryCount); this.lastAttemptedOrReportedTime = lastAttemptedOrReportedTime; this.blocks = blocks; @@ -1121,10 +1159,9 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { this.lastAttemptedOrReportedTime = monotonicNow(); } - List<Block> getBlocks() { + Set<Block> getBlocks() { return this.blocks; } - } /** @@ -1241,4 +1278,15 @@ public class StoragePolicySatisfier<T> implements SPSService<T>, Runnable { "It should be a positive, non-zero integer value."); return spsWorkMultiplier; } + + /** + * Sets external listener for testing. + * + * @param blkMovementListener + * block movement listener callback object + */ + @VisibleForTesting + void setBlockMovementListener(BlockMovementListener blkMovementListener) { + storageMovementsMonitor.setBlockMovementListener(blkMovementListener); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/8bc72521/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index fcc2df1..311b68f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -112,7 +112,6 @@ public interface DatanodeProtocol { * @param slowPeers Details of peer DataNodes that were detected as being * slow to respond to packet writes. Empty report if no * slow peers were detected by the DataNode. - * @param storageMovFinishedBlks array of movement attempt finished blocks * @throws IOException on error */ @Idempotent @@ -126,8 +125,7 @@ public interface DatanodeProtocol { VolumeFailureSummary volumeFailureSummary, boolean requestFullBlockReportLease, @Nonnull SlowPeerReports slowPeers, - @Nonnull SlowDiskReports slowDisks, - BlocksStorageMoveAttemptFinished storageMovFinishedBlks) + @Nonnull SlowDiskReports slowDisks) throws IOException; /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org