Repository: hadoop Updated Branches: refs/heads/branch-2 8ffe86f78 -> 0d8a35bd6
HDFS-8674. Improve performance of postponed block scans. Contributed by Daryn Sharp. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0d8a35bd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0d8a35bd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0d8a35bd Branch: refs/heads/branch-2 Commit: 0d8a35bd6de5d2a5a9b816ca98f31975e94bd7c6 Parents: 8ffe86f Author: Kihwal Lee <kih...@apache.org> Authored: Thu Dec 1 12:15:15 2016 -0600 Committer: Kihwal Lee <kih...@apache.org> Committed: Thu Dec 1 12:15:15 2016 -0600 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 76 +++++++------------- 1 file changed, 24 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d8a35bd/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 954b297..f2805e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -29,6 +29,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -170,7 +171,6 @@ public class BlockManager implements BlockStatsMXBean { private boolean initializedReplQueues; private final AtomicLong excessBlocksCount = new AtomicLong(0L); - private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L); private final long startupDelayBlockDeletionInMs; private final BlockReportLeaseManager blockReportLeaseManager; private ObjectName mxBeanName; @@ -205,7 +205,7 @@ public class BlockManager implements BlockStatsMXBean { } /** Used by metrics */ public long getPostponedMisreplicatedBlocksCount() { - return postponedMisreplicatedBlocksCount.get(); + return postponedMisreplicatedBlocks.size(); } /** Used by metrics */ public int getPendingDataNodeMessageCount() { @@ -245,8 +245,10 @@ public class BlockManager implements BlockStatsMXBean { * notified of all block deletions that might have been pending * when the failover happened. */ - private final LightWeightHashSet<Block> postponedMisreplicatedBlocks = - new LightWeightHashSet<>(); + private final LinkedHashSet<Block> postponedMisreplicatedBlocks = + new LinkedHashSet<Block>(); + private final int blocksPerPostpondedRescan; + private final ArrayList<Block> rescannedMisreplicatedBlocks; /** * Maps a StorageID to the set of blocks that are "extra" for this @@ -345,7 +347,10 @@ public class BlockManager implements BlockStatsMXBean { datanodeManager = new DatanodeManager(this, namesystem, conf); heartbeatManager = datanodeManager.getHeartbeatManager(); this.blockIdManager = new BlockIdManager(this); - + blocksPerPostpondedRescan = (int)Math.min(Integer.MAX_VALUE, + datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan()); + rescannedMisreplicatedBlocks = + new ArrayList<Block>(blocksPerPostpondedRescan); startupDelayBlockDeletionInMs = conf.getLong( DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L; @@ -1455,9 +1460,7 @@ public class BlockManager implements BlockStatsMXBean { private void postponeBlock(Block blk) { - if (postponedMisreplicatedBlocks.add(blk)) { - postponedMisreplicatedBlocksCount.incrementAndGet(); - } + postponedMisreplicatedBlocks.add(blk); } @@ -2164,39 +2167,14 @@ public class BlockManager implements BlockStatsMXBean { if (getPostponedMisreplicatedBlocksCount() == 0) { return; } - long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow(); namesystem.writeLock(); - long startPostponedMisReplicatedBlocksCount = - getPostponedMisreplicatedBlocksCount(); + long startTime = Time.monotonicNow(); + long startSize = postponedMisreplicatedBlocks.size(); try { - // blocksPerRescan is the configured number of blocks per rescan. - // Randomly select blocksPerRescan consecutive blocks from the HashSet - // when the number of blocks remaining is larger than blocksPerRescan. - // The reason we don't always pick the first blocksPerRescan blocks is to - // handle the case if for some reason some datanodes remain in - // content stale state for a long time and only impact the first - // blocksPerRescan blocks. - int i = 0; - long startIndex = 0; - long blocksPerRescan = - datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan(); - long base = getPostponedMisreplicatedBlocksCount() - blocksPerRescan; - if (base > 0) { - startIndex = ThreadLocalRandom.current().nextLong() % (base+1); - if (startIndex < 0) { - startIndex += (base+1); - } - } Iterator<Block> it = postponedMisreplicatedBlocks.iterator(); - for (int tmp = 0; tmp < startIndex; tmp++) { - it.next(); - } - - for (;it.hasNext(); i++) { + for (int i=0; i < blocksPerPostpondedRescan && it.hasNext(); i++) { Block b = it.next(); - if (i >= blocksPerRescan) { - break; - } + it.remove(); BlockInfo bi = blocksMap.getStoredBlock(b); if (bi == null) { @@ -2205,8 +2183,6 @@ public class BlockManager implements BlockStatsMXBean { "Postponed mis-replicated block " + b + " no longer found " + "in block map."); } - it.remove(); - postponedMisreplicatedBlocksCount.decrementAndGet(); continue; } MisReplicationResult res = processMisReplicatedBlock(bi); @@ -2214,20 +2190,19 @@ public class BlockManager implements BlockStatsMXBean { LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " + "Re-scanned block " + b + ", result is " + res); } - if (res != MisReplicationResult.POSTPONE) { - it.remove(); - postponedMisreplicatedBlocksCount.decrementAndGet(); + if (res == MisReplicationResult.POSTPONE) { + rescannedMisreplicatedBlocks.add(b); } } } finally { - long endPostponedMisReplicatedBlocksCount = - getPostponedMisreplicatedBlocksCount(); + postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks); + rescannedMisreplicatedBlocks.clear(); + long endSize = postponedMisreplicatedBlocks.size(); namesystem.writeUnlock(); LOG.info("Rescan of postponedMisreplicatedBlocks completed in " + - (Time.monotonicNow() - startTimeRescanPostponedMisReplicatedBlocks) + - " msecs. " + endPostponedMisReplicatedBlocksCount + - " blocks are left. " + (startPostponedMisReplicatedBlocksCount - - endPostponedMisReplicatedBlocksCount) + " blocks are removed."); + (Time.monotonicNow() - startTime) + " msecs. " + + endSize + " blocks are left. " + + (startSize - endSize) + " blocks were removed."); } } @@ -3626,9 +3601,7 @@ public class BlockManager implements BlockStatsMXBean { // Remove the block from pendingReplications and neededReplications pendingReplications.remove(block); neededReplications.remove(block, UnderReplicatedBlocks.LEVEL); - if (postponedMisreplicatedBlocks.remove(block)) { - postponedMisreplicatedBlocksCount.decrementAndGet(); - } + postponedMisreplicatedBlocks.remove(block); } public BlockInfo getStoredBlock(Block block) { @@ -3944,7 +3917,6 @@ public class BlockManager implements BlockStatsMXBean { invalidateBlocks.clear(); datanodeManager.clearPendingQueues(); postponedMisreplicatedBlocks.clear(); - postponedMisreplicatedBlocksCount.set(0); }; public static LocatedBlock newLocatedBlock( --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org