HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple threads (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6e62a1a6 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6e62a1a6 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6e62a1a6 Branch: refs/heads/trunk Commit: 6e62a1a6728b1f782f64065424f92b292c3f163a Parents: a003f71 Author: Colin Patrick Mccabe <[email protected]> Authored: Wed Dec 17 11:27:48 2014 -0800 Committer: Colin Patrick Mccabe <[email protected]> Committed: Wed Jan 21 19:00:53 2015 -0800 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../hdfs/server/datanode/BPOfferService.java | 3 - .../hdfs/server/datanode/BPServiceActor.java | 6 - .../server/datanode/BlockPoolSliceScanner.java | 872 ------------------- .../hdfs/server/datanode/BlockReceiver.java | 8 - .../hdfs/server/datanode/BlockScanner.java | 308 +++++++ .../hdfs/server/datanode/BlockSender.java | 3 - .../hdfs/server/datanode/DataBlockScanner.java | 339 ------- .../hadoop/hdfs/server/datanode/DataNode.java | 73 +- .../hdfs/server/datanode/VolumeScanner.java | 652 ++++++++++++++ .../server/datanode/fsdataset/FsDatasetSpi.java | 32 +- .../server/datanode/fsdataset/FsVolumeSpi.java | 110 +++ .../server/datanode/fsdataset/RollingLogs.java | 73 -- .../datanode/fsdataset/impl/FsDatasetImpl.java | 44 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 347 ++++++++ .../datanode/fsdataset/impl/FsVolumeList.java | 24 +- .../fsdataset/impl/RollingLogsImpl.java | 241 ----- .../src/main/resources/hdfs-default.xml | 20 + .../org/apache/hadoop/hdfs/DFSTestUtil.java | 16 + .../hadoop/hdfs/TestDatanodeBlockScanner.java | 551 ------------ .../org/apache/hadoop/hdfs/TestReplication.java | 3 +- .../TestOverReplicatedBlocks.java | 13 +- .../server/datanode/BlockReportTestBase.java | 7 +- .../hdfs/server/datanode/DataNodeTestUtils.java | 24 - .../server/datanode/SimulatedFSDataset.java | 22 +- .../hdfs/server/datanode/TestBlockScanner.java | 680 +++++++++++++++ .../server/datanode/TestDirectoryScanner.java | 16 + .../TestMultipleNNDataBlockScanner.java | 245 ------ .../extdataset/ExternalDatasetImpl.java | 7 - .../extdataset/ExternalRollingLogs.java | 92 -- .../datanode/extdataset/ExternalVolumeImpl.java | 17 + .../extdataset/TestExternalDataset.java | 9 - .../fsdataset/impl/FsVolumeListTest.java | 17 +- .../fsdataset/impl/TestFsDatasetImpl.java | 30 +- .../impl/TestInterDatanodeProtocol.java | 4 +- .../namenode/snapshot/SnapshotTestHelper.java | 4 +- 37 files changed, 2288 insertions(+), 2629 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 25ad33b..866b765 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -140,6 +140,9 @@ Trunk (Unreleased) class and constructor to public; and fix FsDatasetSpi to use generic type instead of FsVolumeImpl. (David Powell and Joe Pallas via szetszwo) + HDFS-7430. Rewrite the BlockScanner to use O(1) memory and use multiple + threads (cmccabe) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index fb958f1..60581b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -441,6 +441,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096; public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours"; public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0; + public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second"; + public static final long DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L; public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed"; public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true; public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index 4a54bed..dfeacde 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -656,9 +656,6 @@ class BPOfferService { // Block toDelete[] = bcmd.getBlocks(); try { - if (dn.blockScanner != null) { - dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete); - } // using global fsdataset dn.getFSDataset().invalidate(bcmd.getBlockPoolId(), toDelete); } catch(IOException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index e6409ab..e396727 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -736,12 +736,6 @@ class BPServiceActor implements Runnable { DatanodeCommand cmd = cacheReport(); processCommand(new DatanodeCommand[]{ cmd }); - // Now safe to start scanning the block pool. - // If it has already been started, this is a no-op. - if (dn.blockScanner != null) { - dn.blockScanner.addBlockPool(bpos.getBlockPoolId()); - } - // // There is no work to do; sleep until hearbeat timer elapses, // or work arrives, and then iterate again. http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java deleted file mode 100644 index f36fea1..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java +++ /dev/null @@ -1,872 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.datanode; - -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.common.GenerationStamp; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; -import org.apache.hadoop.hdfs.util.DataTransferThrottler; -import org.apache.hadoop.util.GSet; -import org.apache.hadoop.util.LightWeightGSet; -import org.apache.hadoop.util.LightWeightGSet.LinkedElement; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.Time; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Scans the block files under a block pool and verifies that the - * files are not corrupt. - * This keeps track of blocks and their last verification times. - * Currently it does not modify the metadata for block. - */ - -class BlockPoolSliceScanner { - - public static final Log LOG = LogFactory.getLog(BlockPoolSliceScanner.class); - - private static final String DATA_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS"; - - private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec - private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec - private static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks - - private static final String VERIFICATION_PREFIX = "dncp_block_verification.log"; - - private final String blockPoolId; - private final long scanPeriod; - private final AtomicLong lastScanTime = new AtomicLong(); - - private final DataNode datanode; - private final FsDatasetSpi<? extends FsVolumeSpi> dataset; - - private final SortedSet<BlockScanInfo> blockInfoSet - = new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR); - - private final SortedSet<BlockScanInfo> newBlockInfoSet = - new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR); - - private final GSet<Block, BlockScanInfo> blockMap - = new LightWeightGSet<Block, BlockScanInfo>( - LightWeightGSet.computeCapacity(0.5, "BlockMap")); - - // processedBlocks keeps track of which blocks are scanned - // since the last run. - private volatile HashMap<Long, Integer> processedBlocks; - - private long totalScans = 0; - private long totalScanErrors = 0; - private long totalTransientErrors = 0; - private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only - - private long currentPeriodStart = Time.monotonicNow(); - private long bytesLeft = 0; // Bytes to scan in this period - private long totalBytesToScan = 0; - private boolean isNewPeriod = true; - private int lastScanTimeDifference = 5*60*1000; - - private final LogFileHandler verificationLog; - - private final DataTransferThrottler throttler = new DataTransferThrottler( - 200, MAX_SCAN_RATE); - - private static enum ScanType { - IMMEDIATE_SCAN, - VERIFICATION_SCAN, // scanned as part of periodic verfication - NONE, - } - - // Extend Block because in the DN process there's a 1-to-1 correspondence of - // BlockScanInfo to Block instances, so by extending rather than containing - // Block, we can save a bit of Object overhead (about 24 bytes per block - // replica.) - static class BlockScanInfo extends Block - implements LightWeightGSet.LinkedElement { - - /** Compare the info by the last scan time. */ - static final Comparator<BlockScanInfo> LAST_SCAN_TIME_COMPARATOR - = new Comparator<BlockPoolSliceScanner.BlockScanInfo>() { - - @Override - public int compare(BlockScanInfo left, BlockScanInfo right) { - final ScanType leftNextScanType = left.nextScanType; - final ScanType rightNextScanType = right.nextScanType; - final long l = left.lastScanTime; - final long r = right.lastScanTime; - // Compare by nextScanType if they are same then compare by - // lastScanTimes - // compare blocks itself if scantimes are same to avoid. - // because TreeMap uses comparator if available to check existence of - // the object. - int compareByNextScanType = leftNextScanType.compareTo(rightNextScanType); - return compareByNextScanType < 0? -1: compareByNextScanType > 0? 1: l < r? -1: l > r? 1: left.compareTo(right); - } - }; - - long lastScanTime = 0; - ScanType lastScanType = ScanType.NONE; - boolean lastScanOk = true; - private LinkedElement next; - ScanType nextScanType = ScanType.VERIFICATION_SCAN; - - BlockScanInfo(Block block) { - super(block); - } - - @Override - public int hashCode() { - return super.hashCode(); - } - - @Override - public boolean equals(Object that) { - if (this == that) { - return true; - } - return super.equals(that); - } - - long getLastScanTime() { - return (lastScanType == ScanType.NONE) ? 0 : lastScanTime; - } - - @Override - public void setNext(LinkedElement next) { - this.next = next; - } - - @Override - public LinkedElement getNext() { - return next; - } - } - - BlockPoolSliceScanner(String bpid, DataNode datanode, - FsDatasetSpi<? extends FsVolumeSpi> dataset, Configuration conf) { - this.datanode = datanode; - this.dataset = dataset; - this.blockPoolId = bpid; - - long hours = conf.getInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, - DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT); - if (hours <= 0) { - hours = DEFAULT_SCAN_PERIOD_HOURS; - } - this.scanPeriod = hours * 3600 * 1000; - LOG.info("Periodic Block Verification Scanner initialized with interval " - + hours + " hours for block pool " + bpid); - - // get the list of blocks and arrange them in random order - List<FinalizedReplica> arr = dataset.getFinalizedBlocksOnPersistentStorage(blockPoolId); - Collections.shuffle(arr); - - long scanTime = -1; - for (Block block : arr) { - BlockScanInfo info = new BlockScanInfo( block ); - info.lastScanTime = scanTime--; - //still keep 'info.lastScanType' to NONE. - addBlockInfo(info, false); - } - - RollingLogs rollingLogs = null; - try { - rollingLogs = dataset.createRollingLogs(blockPoolId, VERIFICATION_PREFIX); - } catch (IOException e) { - LOG.warn("Could not open verfication log. " + - "Verification times are not stored."); - } - verificationLog = rollingLogs == null? null: new LogFileHandler(rollingLogs); - } - - String getBlockPoolId() { - return blockPoolId; - } - - private void updateBytesToScan(long len, long lastScanTime) { - // len could be negative when a block is deleted. - totalBytesToScan += len; - if ( lastScanTime < currentPeriodStart ) { - bytesLeft += len; - } - // Should we change throttler bandwidth every time bytesLeft changes? - // not really required. - } - - /** - * Add the BlockScanInfo to sorted set of blockScanInfo - * @param info BlockScanInfo to be added - * @param isNewBlock true if the block is the new Block, false if - * BlockScanInfo is being updated with new scanTime - */ - private synchronized void addBlockInfo(BlockScanInfo info, - boolean isNewBlock) { - boolean added = false; - if (isNewBlock) { - // check whether the block already present - boolean exists = blockInfoSet.contains(info); - added = !exists && newBlockInfoSet.add(info); - } else { - added = blockInfoSet.add(info); - } - blockMap.put(info); - - if (added) { - updateBytesToScan(info.getNumBytes(), info.lastScanTime); - } - } - - private synchronized void delBlockInfo(BlockScanInfo info) { - boolean exists = blockInfoSet.remove(info); - if (!exists){ - exists = newBlockInfoSet.remove(info); - } - blockMap.remove(info); - - if (exists) { - updateBytesToScan(-info.getNumBytes(), info.lastScanTime); - } - } - - /** Update blockMap by the given LogEntry */ - private synchronized void updateBlockInfo(LogEntry e) { - BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp)); - - if (info != null && e.verificationTime > 0 && - info.lastScanTime < e.verificationTime) { - delBlockInfo(info); - if (info.nextScanType != ScanType.IMMEDIATE_SCAN) { - info.lastScanTime = e.verificationTime; - } - info.lastScanType = ScanType.VERIFICATION_SCAN; - addBlockInfo(info, false); - } - } - - private synchronized long getNewBlockScanTime() { - /* If there are a lot of blocks, this returns a random time with in - * the scan period. Otherwise something sooner. - */ - long period = Math.min(scanPeriod, - Math.max(blockMap.size(),1) * 600 * 1000L); - int periodInt = Math.abs((int)period); - return Time.monotonicNow() - scanPeriod + - DFSUtil.getRandom().nextInt(periodInt); - } - - /** Adds block to list of blocks - * @param scanNow - true if we want to make that particular block a high - * priority one to scan immediately - **/ - synchronized void addBlock(ExtendedBlock block, boolean scanNow) { - BlockScanInfo info = blockMap.get(block.getLocalBlock()); - long lastScanTime = 0; - if (info != null) { - lastScanTime = info.lastScanTime; - } - // If the particular block is scanned in last 5 minutes, the no need to - // verify that block again - if (scanNow && Time.monotonicNow() - lastScanTime < - lastScanTimeDifference) { - return; - } - - if ( info != null ) { - LOG.warn("Adding an already existing block " + block); - delBlockInfo(info); - } - - info = new BlockScanInfo(block.getLocalBlock()); - info.lastScanTime = getNewBlockScanTime(); - if (scanNow) { - // Create a new BlockScanInfo object and set the lastScanTime to 0 - // which will make it the high priority block - LOG.info("Adding block for immediate verification " + block); - info.nextScanType = ScanType.IMMEDIATE_SCAN; - } - - addBlockInfo(info, true); - adjustThrottler(); - } - - /** Deletes the block from internal structures */ - synchronized void deleteBlock(Block block) { - BlockScanInfo info = blockMap.get(block); - if (info != null) { - delBlockInfo(info); - } - } - - @VisibleForTesting - long getTotalScans() { - return totalScans; - } - - /** @return the last scan time for the block pool. */ - long getLastScanTime() { - return lastScanTime.get(); - } - - /** @return the last scan time the given block. */ - synchronized long getLastScanTime(Block block) { - BlockScanInfo info = blockMap.get(block); - return info == null? 0: info.lastScanTime; - } - - /** Deletes blocks from internal structures */ - void deleteBlocks(Block[] blocks) { - for ( Block b : blocks ) { - deleteBlock(b); - } - } - - private synchronized void updateScanStatus(BlockScanInfo info, - ScanType type, - boolean scanOk) { - delBlockInfo(info); - - long now = Time.monotonicNow(); - info.lastScanType = type; - info.lastScanTime = now; - info.lastScanOk = scanOk; - info.nextScanType = ScanType.VERIFICATION_SCAN; - addBlockInfo(info, false); - - // Don't update meta data if the verification failed. - if (!scanOk) { - return; - } - - if (verificationLog != null) { - verificationLog.append(now, info.getGenerationStamp(), - info.getBlockId()); - } - } - - private void handleScanFailure(ExtendedBlock block) { - LOG.info("Reporting bad " + block); - try { - datanode.reportBadBlocks(block); - } catch (IOException ie) { - // it is bad, but not bad enough to shutdown the scanner - LOG.warn("Cannot report bad " + block.getBlockId()); - } - } - - @VisibleForTesting - synchronized void setLastScanTimeDifference(int lastScanTimeDifference) { - this.lastScanTimeDifference = lastScanTimeDifference; - } - - static private class LogEntry { - - long blockId = -1; - long verificationTime = -1; - long genStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP; - - /** - * The format consists of single line with multiple entries. each - * entry is in the form : name="value". - * This simple text and easily extendable and easily parseable with a - * regex. - */ - private static final Pattern entryPattern = - Pattern.compile("\\G\\s*([^=\\p{Space}]+)=\"(.*?)\"\\s*"); - - static String toString(long verificationTime, long genStamp, long blockId, - DateFormat dateFormat) { - return "\ndate=\"" + dateFormat.format(new Date(verificationTime)) - + "\"\t time=\"" + verificationTime - + "\"\t genstamp=\"" + genStamp - + "\"\t id=\"" + blockId + "\""; - } - - static LogEntry parseEntry(String line) { - LogEntry entry = new LogEntry(); - - Matcher matcher = entryPattern.matcher(line); - while (matcher.find()) { - String name = matcher.group(1); - String value = matcher.group(2); - - try { - if (name.equals("id")) { - entry.blockId = Long.parseLong(value); - } else if (name.equals("time")) { - entry.verificationTime = Long.parseLong(value); - } else if (name.equals("genstamp")) { - entry.genStamp = Long.parseLong(value); - } - } catch(NumberFormatException nfe) { - LOG.warn("Cannot parse line: " + line, nfe); - return null; - } - } - - return entry; - } - } - - private synchronized void adjustThrottler() { - long timeLeft = Math.max(1L, - currentPeriodStart + scanPeriod - Time.monotonicNow()); - long bw = Math.max((bytesLeft * 1000) / timeLeft, MIN_SCAN_RATE); - throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE)); - } - - @VisibleForTesting - void verifyBlock(ExtendedBlock block) { - BlockSender blockSender = null; - - /* In case of failure, attempt to read second time to reduce - * transient errors. How do we flush block data from kernel - * buffers before the second read? - */ - for (int i=0; i<2; i++) { - boolean second = (i > 0); - - try { - adjustThrottler(); - - blockSender = new BlockSender(block, 0, -1, false, true, true, - datanode, null, CachingStrategy.newDropBehind()); - - DataOutputStream out = - new DataOutputStream(new IOUtils.NullOutputStream()); - - blockSender.sendBlock(out, null, throttler); - - LOG.info((second ? "Second " : "") + - "Verification succeeded for " + block); - - if ( second ) { - totalTransientErrors++; - } - - updateScanStatus((BlockScanInfo)block.getLocalBlock(), - ScanType.VERIFICATION_SCAN, true); - - return; - } catch (IOException e) { - updateScanStatus((BlockScanInfo)block.getLocalBlock(), - ScanType.VERIFICATION_SCAN, false); - - // If the block does not exists anymore, then its not an error - if (!dataset.contains(block)) { - LOG.info(block + " is no longer in the dataset"); - deleteBlock(block.getLocalBlock()); - return; - } - - // If the block exists, the exception may due to a race with write: - // The BlockSender got an old block path in rbw. BlockReceiver removed - // the rbw block from rbw to finalized but BlockSender tried to open the - // file before BlockReceiver updated the VolumeMap. The state of the - // block can be changed again now, so ignore this error here. If there - // is a block really deleted by mistake, DirectoryScan should catch it. - if (e instanceof FileNotFoundException ) { - LOG.info("Verification failed for " + block + - " - may be due to race with write"); - deleteBlock(block.getLocalBlock()); - return; - } - - LOG.warn((second ? "Second " : "First ") + "Verification failed for " - + block, e); - - if (second) { - totalScanErrors++; - datanode.getMetrics().incrBlockVerificationFailures(); - handleScanFailure(block); - return; - } - } finally { - IOUtils.closeStream(blockSender); - datanode.getMetrics().incrBlocksVerified(); - totalScans++; - } - } - } - - private synchronized long getEarliestScanTime() { - if (!blockInfoSet.isEmpty()) { - return blockInfoSet.first().lastScanTime; - } - return Long.MAX_VALUE; - } - - private synchronized boolean isFirstBlockProcessed() { - if (!blockInfoSet.isEmpty()) { - if (blockInfoSet.first().nextScanType == ScanType.IMMEDIATE_SCAN) { - return false; - } - long blockId = blockInfoSet.first().getBlockId(); - if ((processedBlocks.get(blockId) != null) - && (processedBlocks.get(blockId) == 1)) { - return true; - } - } - return false; - } - - // Picks one block and verifies it - private void verifyFirstBlock() { - BlockScanInfo block = null; - synchronized (this) { - if (!blockInfoSet.isEmpty()) { - block = blockInfoSet.first(); - } - } - if ( block != null ) { - verifyBlock(new ExtendedBlock(blockPoolId, block)); - processedBlocks.put(block.getBlockId(), 1); - } - } - - // Used for tests only - int getBlocksScannedInLastRun() { - return totalBlocksScannedInLastRun.get(); - } - - /** - * Reads the current and previous log files (if any) and marks the blocks - * processed if they were processed within last scan period. Copies the log - * records of recently scanned blocks from previous to current file. - * Returns false if the process was interrupted because the thread is marked - * to exit. - */ - private boolean assignInitialVerificationTimes() { - //First updates the last verification times from the log file. - if (verificationLog != null) { - long now = Time.monotonicNow(); - RollingLogs.LineIterator logIterator = null; - try { - logIterator = verificationLog.logs.iterator(false); - // update verification times from the verificationLog. - while (logIterator.hasNext()) { - if (!datanode.shouldRun - || datanode.blockScanner.blockScannerThread.isInterrupted()) { - return false; - } - LogEntry entry = LogEntry.parseEntry(logIterator.next()); - if (entry != null) { - updateBlockInfo(entry); - if (now - entry.verificationTime < scanPeriod) { - BlockScanInfo info = blockMap.get(new Block(entry.blockId, 0, - entry.genStamp)); - if (info != null) { - if (processedBlocks.get(entry.blockId) == null) { - if (isNewPeriod) { - updateBytesLeft(-info.getNumBytes()); - } - processedBlocks.put(entry.blockId, 1); - } - if (logIterator.isLastReadFromPrevious()) { - // write the log entry to current file - // so that the entry is preserved for later runs. - verificationLog.append(entry.verificationTime, entry.genStamp, - entry.blockId); - } - } - } - } - } - } catch (IOException e) { - LOG.warn("Failed to read previous verification times.", e); - } finally { - IOUtils.closeStream(logIterator); - } - isNewPeriod = false; - } - - - /* Before this loop, entries in blockInfoSet that are not - * updated above have lastScanTime of <= 0 . Loop until first entry has - * lastModificationTime > 0. - */ - synchronized (this) { - final int numBlocks = Math.max(blockMap.size(), 1); - // Initially spread the block reads over half of scan period - // so that we don't keep scanning the blocks too quickly when restarted. - long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L); - long lastScanTime = Time.monotonicNow() - scanPeriod; - - if (!blockInfoSet.isEmpty()) { - BlockScanInfo info; - while ((info = blockInfoSet.first()).lastScanTime < 0) { - delBlockInfo(info); - info.lastScanTime = lastScanTime; - lastScanTime += verifyInterval; - addBlockInfo(info, false); - } - } - } - - return true; - } - - private synchronized void updateBytesLeft(long len) { - bytesLeft += len; - } - - private synchronized void startNewPeriod() { - LOG.info("Starting a new period : work left in prev period : " - + String.format("%.2f%%", totalBytesToScan == 0 ? 0 - : (bytesLeft * 100.0) / totalBytesToScan)); - - // reset the byte counts : - bytesLeft = totalBytesToScan; - currentPeriodStart = Time.monotonicNow(); - isNewPeriod = true; - } - - private synchronized boolean workRemainingInCurrentPeriod() { - if (bytesLeft <= 0 && Time.monotonicNow() < currentPeriodStart + scanPeriod) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" + - currentPeriodStart + ", period=" + scanPeriod + ", now=" + - Time.monotonicNow() + " " + blockPoolId); - } - return false; - } else { - return true; - } - } - - void scanBlockPoolSlice() { - if (!workRemainingInCurrentPeriod()) { - return; - } - - // Create a new processedBlocks structure - processedBlocks = new HashMap<Long, Integer>(); - if (!assignInitialVerificationTimes()) { - return; - } - // Start scanning - try { - scan(); - } finally { - totalBlocksScannedInLastRun.set(processedBlocks.size()); - lastScanTime.set(Time.monotonicNow()); - } - } - - /** - * Shuts down this BlockPoolSliceScanner and releases any internal resources. - */ - void shutdown() { - if (verificationLog != null) { - verificationLog.close(); - } - } - - private void scan() { - if (LOG.isDebugEnabled()) { - LOG.debug("Starting to scan blockpool: " + blockPoolId); - } - try { - adjustThrottler(); - - while (datanode.shouldRun - && !datanode.blockScanner.blockScannerThread.isInterrupted() - && datanode.isBPServiceAlive(blockPoolId)) { - long now = Time.monotonicNow(); - synchronized (this) { - if ( now >= (currentPeriodStart + scanPeriod)) { - startNewPeriod(); - } - } - if (((now - getEarliestScanTime()) >= scanPeriod) - || ((!blockInfoSet.isEmpty()) && !(this.isFirstBlockProcessed()))) { - verifyFirstBlock(); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("All remaining blocks were processed recently, " - + "so this run is complete"); - } - break; - } - } - } catch (RuntimeException e) { - LOG.warn("RuntimeException during BlockPoolScanner.scan()", e); - throw e; - } finally { - rollVerificationLogs(); - rollNewBlocksInfo(); - if (LOG.isDebugEnabled()) { - LOG.debug("Done scanning block pool: " + blockPoolId); - } - } - } - - // add new blocks to scan in next iteration - private synchronized void rollNewBlocksInfo() { - for (BlockScanInfo newBlock : newBlockInfoSet) { - blockInfoSet.add(newBlock); - } - newBlockInfoSet.clear(); - } - - private synchronized void rollVerificationLogs() { - if (verificationLog != null) { - try { - verificationLog.logs.roll(); - } catch (IOException ex) { - LOG.warn("Received exception: ", ex); - verificationLog.close(); - } - } - } - - - synchronized void printBlockReport(StringBuilder buffer, - boolean summaryOnly) { - long oneHour = 3600*1000; - long oneDay = 24*oneHour; - long oneWeek = 7*oneDay; - long fourWeeks = 4*oneWeek; - - int inOneHour = 0; - int inOneDay = 0; - int inOneWeek = 0; - int inFourWeeks = 0; - int inScanPeriod = 0; - int neverScanned = 0; - - DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT); - - int total = blockInfoSet.size(); - - long now = Time.monotonicNow(); - - Date date = new Date(); - - for(Iterator<BlockScanInfo> it = blockInfoSet.iterator(); it.hasNext();) { - BlockScanInfo info = it.next(); - - long scanTime = info.getLastScanTime(); - long diff = now - scanTime; - - if (diff <= oneHour) inOneHour++; - if (diff <= oneDay) inOneDay++; - if (diff <= oneWeek) inOneWeek++; - if (diff <= fourWeeks) inFourWeeks++; - if (diff <= scanPeriod) inScanPeriod++; - if (scanTime <= 0) neverScanned++; - - if (!summaryOnly) { - date.setTime(scanTime); - String scanType = - (info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" : "none"; - buffer.append(String.format("%-26s : status : %-6s type : %-6s" + - " scan time : " + - "%-15d %s%n", info, - (info.lastScanOk ? "ok" : "failed"), - scanType, scanTime, - (scanTime <= 0) ? "not yet verified" : - dateFormat.format(date))); - } - } - - double pctPeriodLeft = (scanPeriod + currentPeriodStart - now) - *100.0/scanPeriod; - double pctProgress = (totalBytesToScan == 0) ? 100 : - (totalBytesToScan-bytesLeft)*100.0/totalBytesToScan; - - buffer.append(String.format("%nTotal Blocks : %6d" + - "%nVerified in last hour : %6d" + - "%nVerified in last day : %6d" + - "%nVerified in last week : %6d" + - "%nVerified in last four weeks : %6d" + - "%nVerified in SCAN_PERIOD : %6d" + - "%nNot yet verified : %6d" + - "%nVerified since restart : %6d" + - "%nScans since restart : %6d" + - "%nScan errors since restart : %6d" + - "%nTransient scan errors : %6d" + - "%nCurrent scan rate limit KBps : %6d" + - "%nProgress this period : %6.0f%%" + - "%nTime left in cur period : %6.2f%%" + - "%n", - total, inOneHour, inOneDay, inOneWeek, - inFourWeeks, inScanPeriod, neverScanned, - totalScans, totalScans, - totalScanErrors, totalTransientErrors, - Math.round(throttler.getBandwidth()/1024.0), - pctProgress, pctPeriodLeft)); - } - - /** - * This class takes care of log file used to store the last verification - * times of the blocks. - */ - private static class LogFileHandler { - private final DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT); - - private final RollingLogs logs; - - private LogFileHandler(RollingLogs logs) { - this.logs = logs; - } - - void append(long verificationTime, long genStamp, long blockId) { - final String m = LogEntry.toString(verificationTime, genStamp, blockId, - dateFormat); - try { - logs.appender().append(m); - } catch (IOException e) { - LOG.warn("Failed to append to " + logs + ", m=" + m, e); - } - } - - void close() { - try { - logs.appender().close(); - } catch (IOException e) { - LOG.warn("Failed to close the appender of " + logs, e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index df8dd5c..12041a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -193,20 +193,12 @@ class BlockReceiver implements Closeable { break; case PIPELINE_SETUP_APPEND: replicaHandler = datanode.data.append(block, newGs, minBytesRcvd); - if (datanode.blockScanner != null) { // remove from block scanner - datanode.blockScanner.deleteBlock(block.getBlockPoolId(), - block.getLocalBlock()); - } block.setGenerationStamp(newGs); datanode.notifyNamenodeReceivingBlock( block, replicaHandler.getReplica().getStorageUuid()); break; case PIPELINE_SETUP_APPEND_RECOVERY: replicaHandler = datanode.data.recoverAppend(block, newGs, minBytesRcvd); - if (datanode.blockScanner != null) { // remove from block scanner - datanode.blockScanner.deleteBlock(block.getBlockPoolId(), - block.getLocalBlock()); - } block.setGenerationStamp(newGs); datanode.notifyNamenodeReceivingBlock( block, replicaHandler.getReplica().getStorageUuid()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java new file mode 100644 index 0000000..7429fff --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; + +import java.io.IOException; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + [email protected] +public class BlockScanner { + public static final Logger LOG = + LoggerFactory.getLogger(BlockScanner.class); + + /** + * The DataNode that this scanner is associated with. + */ + private final DataNode datanode; + + /** + * Maps Storage IDs to VolumeScanner objects. + */ + private final TreeMap<String, VolumeScanner> scanners = + new TreeMap<String, VolumeScanner>(); + + /** + * The scanner configuration. + */ + private final Conf conf; + + /** + * The cached scanner configuration. + */ + static class Conf { + // These are a few internal configuration keys used for unit tests. + // They can't be set unless the static boolean allowUnitTestSettings has + // been set to true. + + @VisibleForTesting + static final String INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS = + "internal.dfs.datanode.scan.period.ms.key"; + + @VisibleForTesting + static final String INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER = + "internal.volume.scanner.scan.result.handler"; + + @VisibleForTesting + static final String INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS = + "internal.dfs.block.scanner.max_staleness.ms"; + + @VisibleForTesting + static final long INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS_DEFAULT = + TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES); + + @VisibleForTesting + static final String INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS = + "dfs.block.scanner.cursor.save.interval.ms"; + + @VisibleForTesting + static final long + INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT = + TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES); + + static boolean allowUnitTestSettings = false; + final long targetBytesPerSec; + final long maxStalenessMs; + final long scanPeriodMs; + final long cursorSaveMs; + final Class<? extends ScanResultHandler> resultHandler; + + private static long getUnitTestLong(Configuration conf, String key, + long defVal) { + if (allowUnitTestSettings) { + return conf.getLong(key, defVal); + } else { + return defVal; + } + } + + @SuppressWarnings("unchecked") + Conf(Configuration conf) { + this.targetBytesPerSec = Math.max(0L, conf.getLong( + DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, + DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT)); + this.maxStalenessMs = Math.max(0L, getUnitTestLong(conf, + INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS, + INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS_DEFAULT)); + this.scanPeriodMs = Math.max(0L, + getUnitTestLong(conf, INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS, + TimeUnit.MILLISECONDS.convert(conf.getLong( + DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, + DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT), TimeUnit.HOURS))); + this.cursorSaveMs = Math.max(0L, getUnitTestLong(conf, + INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, + INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT)); + if (allowUnitTestSettings) { + this.resultHandler = (Class<? extends ScanResultHandler>) + conf.getClass(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER, + ScanResultHandler.class); + } else { + this.resultHandler = ScanResultHandler.class; + } + } + } + + public BlockScanner(DataNode datanode, Configuration conf) { + this.datanode = datanode; + this.conf = new Conf(conf); + if (isEnabled()) { + LOG.info("Initialized block scanner with targetBytesPerSec {}", + this.conf.targetBytesPerSec); + } else { + LOG.info("Disabled block scanner."); + } + } + + /** + * Returns true if the block scanner is enabled.<p/> + * + * If the block scanner is disabled, no volume scanners will be created, and + * no threads will start. + */ + public boolean isEnabled() { + return (conf.scanPeriodMs) > 0 && (conf.targetBytesPerSec > 0); + } + + /** + * Set up a scanner for the given block pool and volume. + * + * @param ref A reference to the volume. + */ + public synchronized void addVolumeScanner(FsVolumeReference ref) { + boolean success = false; + try { + FsVolumeSpi volume = ref.getVolume(); + if (!isEnabled()) { + LOG.debug("Not adding volume scanner for {}, because the block " + + "scanner is disabled.", volume.getBasePath()); + return; + } + VolumeScanner scanner = scanners.get(volume.getStorageID()); + if (scanner != null) { + LOG.error("Already have a scanner for volume {}.", + volume.getBasePath()); + return; + } + LOG.debug("Adding scanner for volume {} (StorageID {})", + volume.getBasePath(), volume.getStorageID()); + scanner = new VolumeScanner(conf, datanode, ref); + scanner.start(); + scanners.put(volume.getStorageID(), scanner); + success = true; + } finally { + if (!success) { + // If we didn't create a new VolumeScanner object, we don't + // need this reference to the volume. + IOUtils.cleanup(null, ref); + } + } + } + + /** + * Stops and removes a volume scanner.<p/> + * + * This function will block until the volume scanner has stopped. + * + * @param volume The volume to remove. + */ + public synchronized void removeVolumeScanner(FsVolumeSpi volume) { + if (!isEnabled()) { + LOG.debug("Not removing volume scanner for {}, because the block " + + "scanner is disabled.", volume.getStorageID()); + return; + } + VolumeScanner scanner = scanners.get(volume.getStorageID()); + if (scanner == null) { + LOG.warn("No scanner found to remove for volumeId {}", + volume.getStorageID()); + return; + } + LOG.info("Removing scanner for volume {} (StorageID {})", + volume.getBasePath(), volume.getStorageID()); + scanner.shutdown(); + scanners.remove(volume.getStorageID()); + Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES); + } + + /** + * Stops and removes all volume scanners.<p/> + * + * This function will block until all the volume scanners have stopped. + */ + public synchronized void removeAllVolumeScanners() { + for (Entry<String, VolumeScanner> entry : scanners.entrySet()) { + entry.getValue().shutdown(); + } + for (Entry<String, VolumeScanner> entry : scanners.entrySet()) { + Uninterruptibles.joinUninterruptibly(entry.getValue(), + 5, TimeUnit.MINUTES); + } + scanners.clear(); + } + + /** + * Enable scanning a given block pool id. + * + * @param bpid The block pool id to enable scanning for. + */ + synchronized void enableBlockPoolId(String bpid) { + Preconditions.checkNotNull(bpid); + for (VolumeScanner scanner : scanners.values()) { + scanner.enableBlockPoolId(bpid); + } + } + + /** + * Disable scanning a given block pool id. + * + * @param bpid The block pool id to disable scanning for. + */ + synchronized void disableBlockPoolId(String bpid) { + Preconditions.checkNotNull(bpid); + for (VolumeScanner scanner : scanners.values()) { + scanner.disableBlockPoolId(bpid); + } + } + + @VisibleForTesting + synchronized VolumeScanner.Statistics getVolumeStats(String volumeId) { + VolumeScanner scanner = scanners.get(volumeId); + if (scanner == null) { + return null; + } + return scanner.getStatistics(); + } + + synchronized void printStats(StringBuilder p) { + // print out all bpids that we're scanning ? + for (Entry<String, VolumeScanner> entry : scanners.entrySet()) { + entry.getValue().printStats(p); + } + } + + @InterfaceAudience.Private + public static class Servlet extends HttpServlet { + private static final long serialVersionUID = 1L; + + @Override + public void doGet(HttpServletRequest request, + HttpServletResponse response) throws IOException { + response.setContentType("text/plain"); + + DataNode datanode = (DataNode) + getServletContext().getAttribute("datanode"); + BlockScanner blockScanner = datanode.getBlockScanner(); + + StringBuilder buffer = new StringBuilder(8 * 1024); + if (!blockScanner.isEnabled()) { + LOG.warn("Periodic block scanner is not running"); + buffer.append("Periodic block scanner is not running. " + + "Please check the datanode log if this is unexpected."); + } else { + buffer.append("Block Scanner Statistics\n\n"); + blockScanner.printStats(buffer); + } + String resp = buffer.toString(); + LOG.trace("Returned Servlet info {}", resp); + response.getWriter().write(resp); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 2d312d7..182b366 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -600,9 +600,6 @@ class BlockSender implements java.io.Closeable { String ioem = e.getMessage(); if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) { LOG.error("BlockSender.sendChunks() exception: ", e); - //Something might be wrong with the block. Make this block the high - //priority block for verification. - datanode.blockScanner.addBlock(block, true); } } throw ioeToSocketException(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java deleted file mode 100644 index 450c2b1..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java +++ /dev/null @@ -1,339 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.datanode; - -import java.io.IOException; -import java.util.TreeMap; - -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; - -import com.google.common.annotations.VisibleForTesting; - -/** - * DataBlockScanner manages block scanning for all the block pools. For each - * block pool a {@link BlockPoolSliceScanner} is created which runs in a separate - * thread to scan the blocks for that block pool. When a {@link BPOfferService} - * becomes alive or dies, blockPoolScannerMap in this class is updated. - */ [email protected] -public class DataBlockScanner implements Runnable { - public static final Log LOG = LogFactory.getLog(DataBlockScanner.class); - private final DataNode datanode; - private final FsDatasetSpi<? extends FsVolumeSpi> dataset; - private final Configuration conf; - - static final int SLEEP_PERIOD_MS = 5 * 1000; - - /** - * Map to find the BlockPoolScanner for a given block pool id. This is updated - * when a BPOfferService becomes alive or dies. - */ - private final TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap = - new TreeMap<String, BlockPoolSliceScanner>(); - Thread blockScannerThread = null; - - DataBlockScanner(DataNode datanode, - FsDatasetSpi<? extends FsVolumeSpi> dataset, - Configuration conf) { - this.datanode = datanode; - this.dataset = dataset; - this.conf = conf; - } - - @Override - public void run() { - String currentBpId = ""; - boolean firstRun = true; - while (datanode.shouldRun && !Thread.interrupted()) { - //Sleep everytime except in the first iteration. - if (!firstRun) { - try { - Thread.sleep(SLEEP_PERIOD_MS); - } catch (InterruptedException ex) { - // Interrupt itself again to set the interrupt status - blockScannerThread.interrupt(); - continue; - } - } else { - firstRun = false; - } - - BlockPoolSliceScanner bpScanner = getNextBPScanner(currentBpId); - if (bpScanner == null) { - // Possible if thread is interrupted - continue; - } - currentBpId = bpScanner.getBlockPoolId(); - // If BPOfferService for this pool is not alive, don't process it - if (!datanode.isBPServiceAlive(currentBpId)) { - LOG.warn("Block Pool " + currentBpId + " is not alive"); - // Remove in case BP service died abruptly without proper shutdown - removeBlockPool(currentBpId); - continue; - } - bpScanner.scanBlockPoolSlice(); - } - - // Call shutdown for each allocated BlockPoolSliceScanner. - for (BlockPoolSliceScanner bpss: blockPoolScannerMap.values()) { - bpss.shutdown(); - } - } - - // Wait for at least one block pool to be up - private void waitForInit() { - while ((getBlockPoolSetSize() < datanode.getAllBpOs().length) - || (getBlockPoolSetSize() < 1)) { - try { - Thread.sleep(SLEEP_PERIOD_MS); - } catch (InterruptedException e) { - blockScannerThread.interrupt(); - return; - } - } - } - - /** - * Find next block pool id to scan. There should be only one current - * verification log file. Find which block pool contains the current - * verification log file and that is used as the starting block pool id. If no - * current files are found start with first block-pool in the blockPoolSet. - * However, if more than one current files are found, the one with latest - * modification time is used to find the next block pool id. - */ - private BlockPoolSliceScanner getNextBPScanner(String currentBpId) { - - String nextBpId = null; - while (datanode.shouldRun && !blockScannerThread.isInterrupted()) { - waitForInit(); - synchronized (this) { - if (getBlockPoolSetSize() > 0) { - // Find nextBpId by the minimum of the last scan time - long lastScanTime = 0; - for (String bpid : blockPoolScannerMap.keySet()) { - final long t = getBPScanner(bpid).getLastScanTime(); - if (t != 0L) { - if (bpid == null || t < lastScanTime) { - lastScanTime = t; - nextBpId = bpid; - } - } - } - - // nextBpId can still be null if no current log is found, - // find nextBpId sequentially. - if (nextBpId == null) { - nextBpId = blockPoolScannerMap.higherKey(currentBpId); - if (nextBpId == null) { - nextBpId = blockPoolScannerMap.firstKey(); - } - } - if (nextBpId != null) { - return getBPScanner(nextBpId); - } - } - } - LOG.warn("No block pool is up, going to wait"); - try { - Thread.sleep(5000); - } catch (InterruptedException ex) { - LOG.warn("Received exception: " + ex); - blockScannerThread.interrupt(); - return null; - } - } - return null; - } - - private synchronized int getBlockPoolSetSize() { - return blockPoolScannerMap.size(); - } - - @VisibleForTesting - synchronized BlockPoolSliceScanner getBPScanner(String bpid) { - return blockPoolScannerMap.get(bpid); - } - - private synchronized String[] getBpIdList() { - return blockPoolScannerMap.keySet().toArray( - new String[blockPoolScannerMap.keySet().size()]); - } - - public void addBlock(ExtendedBlock block, boolean scanNow) { - BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId()); - if (bpScanner != null) { - bpScanner.addBlock(block, scanNow); - } else { - LOG.warn("No block pool scanner found for block pool id: " - + block.getBlockPoolId()); - } - } - - boolean isInitialized(String bpid) { - return getBPScanner(bpid) != null; - } - - public synchronized void printBlockReport(StringBuilder buffer, - boolean summary) { - String[] bpIdList = getBpIdList(); - if (bpIdList == null || bpIdList.length == 0) { - buffer.append("Periodic block scanner is not yet initialized. " - + "Please check back again after some time."); - return; - } - for (String bpid : bpIdList) { - BlockPoolSliceScanner bpScanner = getBPScanner(bpid); - buffer.append("\n\nBlock report for block pool: "+bpid + "\n"); - bpScanner.printBlockReport(buffer, summary); - buffer.append("\n"); - } - } - - public void deleteBlock(String poolId, Block toDelete) { - BlockPoolSliceScanner bpScanner = getBPScanner(poolId); - if (bpScanner != null) { - bpScanner.deleteBlock(toDelete); - } else { - LOG.warn("No block pool scanner found for block pool id: " - + poolId); - } - } - - public void deleteBlocks(String poolId, Block[] toDelete) { - BlockPoolSliceScanner bpScanner = getBPScanner(poolId); - if (bpScanner != null) { - bpScanner.deleteBlocks(toDelete); - } else { - LOG.warn("No block pool scanner found for block pool id: " - + poolId); - } - } - - public void shutdown() { - synchronized (this) { - if (blockScannerThread != null) { - blockScannerThread.interrupt(); - } - } - - // We cannot join within the synchronized block, because it would create a - // deadlock situation. blockScannerThread calls other synchronized methods. - if (blockScannerThread != null) { - try { - blockScannerThread.join(); - } catch (InterruptedException e) { - // shutting down anyway - } - } - } - - public synchronized void addBlockPool(String blockPoolId) { - if (blockPoolScannerMap.get(blockPoolId) != null) { - return; - } - BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(blockPoolId, - datanode, dataset, conf); - blockPoolScannerMap.put(blockPoolId, bpScanner); - LOG.info("Added bpid=" + blockPoolId + " to blockPoolScannerMap, new size=" - + blockPoolScannerMap.size()); - } - - public synchronized void removeBlockPool(String blockPoolId) { - BlockPoolSliceScanner bpss = blockPoolScannerMap.remove(blockPoolId); - if (bpss != null) { - bpss.shutdown(); - } - LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap"); - } - - @VisibleForTesting - long getBlocksScannedInLastRun(String bpid) throws IOException { - BlockPoolSliceScanner bpScanner = getBPScanner(bpid); - if (bpScanner == null) { - throw new IOException("Block Pool: "+bpid+" is not running"); - } else { - return bpScanner.getBlocksScannedInLastRun(); - } - } - - @VisibleForTesting - long getTotalScans(String bpid) throws IOException { - BlockPoolSliceScanner bpScanner = getBPScanner(bpid); - if (bpScanner == null) { - throw new IOException("Block Pool: "+bpid+" is not running"); - } else { - return bpScanner.getTotalScans(); - } - } - - @VisibleForTesting - public void setLastScanTimeDifference(ExtendedBlock block, int lastScanTimeDifference) { - BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId()); - if (bpScanner != null) { - bpScanner.setLastScanTimeDifference(lastScanTimeDifference); - } else { - LOG.warn("No block pool scanner found for block pool id: " - + block.getBlockPoolId()); - } - } - - public void start() { - blockScannerThread = new Thread(this); - blockScannerThread.setDaemon(true); - blockScannerThread.start(); - } - - @InterfaceAudience.Private - public static class Servlet extends HttpServlet { - private static final long serialVersionUID = 1L; - - @Override - public void doGet(HttpServletRequest request, - HttpServletResponse response) throws IOException { - response.setContentType("text/plain"); - - DataNode datanode = (DataNode) getServletContext().getAttribute("datanode"); - DataBlockScanner blockScanner = datanode.blockScanner; - - boolean summary = (request.getParameter("listblocks") == null); - - StringBuilder buffer = new StringBuilder(8*1024); - if (blockScanner == null) { - LOG.warn("Periodic block scanner is not running"); - buffer.append("Periodic block scanner is not running. " + - "Please check the datanode log if this is unexpected."); - } else { - blockScanner.printBlockReport(buffer, summary); - } - response.getWriter().write(buffer.toString()); // extra copy! - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 12df9d6..c77bc3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -316,7 +316,7 @@ public class DataNode extends ReconfigurableBase BlockPoolTokenSecretManager blockPoolTokenSecretManager; private boolean hasAnyBlockPoolRegistered = false; - volatile DataBlockScanner blockScanner = null; + private final BlockScanner blockScanner; private DirectoryScanner directoryScanner = null; /** Activated plug-ins. */ @@ -365,6 +365,7 @@ public class DataNode extends ReconfigurableBase this.usersWithLocalPathAccess = null; this.connectToDnViaHostname = false; this.getHdfsBlockLocationsEnabled = false; + this.blockScanner = new BlockScanner(this, conf); } /** @@ -375,6 +376,7 @@ public class DataNode extends ReconfigurableBase final List<StorageLocation> dataDirs, final SecureResources resources) throws IOException { super(conf); + this.blockScanner = new BlockScanner(this, conf); this.lastDiskErrorCheck = 0; this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT); @@ -671,7 +673,8 @@ public class DataNode extends ReconfigurableBase this.infoServer.setAttribute("datanode", this); this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf); this.infoServer.addServlet(null, "/blockScannerReport", - DataBlockScanner.Servlet.class); + BlockScanner.Servlet.class); + this.infoServer.start(); InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0); @@ -772,56 +775,12 @@ public class DataNode extends ReconfigurableBase // Not a superuser. throw new AccessControlException(); } - -/** - * Initialize the datanode's periodic scanners: - * {@link DataBlockScanner} - * {@link DirectoryScanner} - * They report results on a per-blockpool basis but do their scanning - * on a per-Volume basis to minimize competition for disk iops. - * - * @param conf - Configuration has the run intervals and other - * parameters for these periodic scanners - */ - private void initPeriodicScanners(Configuration conf) { - initDataBlockScanner(conf); - initDirectoryScanner(conf); - } - + private void shutdownPeriodicScanners() { shutdownDirectoryScanner(); - shutdownDataBlockScanner(); - } - - /** - * See {@link DataBlockScanner} - */ - private synchronized void initDataBlockScanner(Configuration conf) { - if (blockScanner != null) { - return; - } - String reason = null; - assert data != null; - if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, - DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) { - reason = "verification is turned off by configuration"; - } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) { - reason = "verifcation is not supported by SimulatedFSDataset"; - } - if (reason == null) { - blockScanner = new DataBlockScanner(this, data, conf); - blockScanner.start(); - } else { - LOG.info("Periodic Block Verification scan disabled because " + reason); - } + blockScanner.removeAllVolumeScanners(); } - - private void shutdownDataBlockScanner() { - if (blockScanner != null) { - blockScanner.shutdown(); - } - } - + /** * See {@link DirectoryScanner} */ @@ -1250,9 +1209,8 @@ public class DataNode extends ReconfigurableBase // registering anywhere. If that's the case, we wouldn't have // a block pool id String bpId = bpos.getBlockPoolId(); - if (blockScanner != null) { - blockScanner.removeBlockPool(bpId); - } + + blockScanner.disableBlockPoolId(bpId); if (data != null) { data.shutdownBlockPool(bpId); @@ -1296,9 +1254,9 @@ public class DataNode extends ReconfigurableBase // failures. checkDiskError(); - initPeriodicScanners(conf); - + initDirectoryScanner(conf); data.addBlockPool(nsInfo.getBlockPoolID(), conf); + blockScanner.enableBlockPoolId(bpos.getBlockPoolId()); } BPOfferService[] getAllBpOs() { @@ -2168,10 +2126,6 @@ public class DataNode extends ReconfigurableBase LOG.warn("Cannot find BPOfferService for reporting block received for bpid=" + block.getBlockPoolId()); } - FsVolumeSpi volume = getFSDataset().getVolume(block); - if (blockScanner != null && !volume.isTransientStorage()) { - blockScanner.addBlock(block, false); - } } /** Start a single datanode daemon and wait for it to finish. @@ -2445,8 +2399,9 @@ public class DataNode extends ReconfigurableBase return data; } + @VisibleForTesting /** @return the block scanner. */ - public DataBlockScanner getBlockScanner() { + public BlockScanner getBlockScanner() { return blockScanner; }
