http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java new file mode 100644 index 0000000..781b4d3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * VolumeScanner scans a single volume. Each VolumeScanner has its own thread.<p/> + * They are all managed by the DataNode's BlockScanner. + */ +public class VolumeScanner extends Thread { + public static final Logger LOG = + LoggerFactory.getLogger(VolumeScanner.class); + + /** + * Number of seconds in a minute. + */ + private final static int SECONDS_PER_MINUTE = 60; + + /** + * Number of minutes in an hour. + */ + private final static int MINUTES_PER_HOUR = 60; + + /** + * Name of the block iterator used by this scanner. + */ + private final static String BLOCK_ITERATOR_NAME = "scanner"; + + /** + * The configuration. + */ + private final Conf conf; + + /** + * The DataNode this VolumEscanner is associated with. + */ + private final DataNode datanode; + + /** + * A reference to the volume that we're scanning. + */ + private final FsVolumeReference ref; + + /** + * The volume that we're scanning. + */ + final FsVolumeSpi volume; + + /** + * The number of scanned bytes in each minute of the last hour.<p/> + * + * This array is managed as a circular buffer. We take the monotonic time and + * divide it up into one-minute periods. Each entry in the array represents + * how many bytes were scanned during that period. + */ + private final long scannedBytes[] = new long[MINUTES_PER_HOUR]; + + /** + * The sum of all the values of scannedBytes. + */ + private long scannedBytesSum = 0; + + /** + * The throttler to use with BlockSender objects. + */ + private final DataTransferThrottler throttler = new DataTransferThrottler(1); + + /** + * The null output stream to use with BlockSender objects. + */ + private final DataOutputStream nullStream = + new DataOutputStream(new IOUtils.NullOutputStream()); + + /** + * The block iterators associated with this VolumeScanner.<p/> + * + * Each block pool has its own BlockIterator. + */ + private final List<BlockIterator> blockIters = + new LinkedList<BlockIterator>(); + + /** + * The current block iterator, or null if there is none. + */ + private BlockIterator curBlockIter = null; + + /** + * True if the thread is stopping.<p/> + * Protected by this object's lock. + */ + private boolean stopping = false; + + /** + * The current minute, in monotonic terms. + */ + private long curMinute = 0; + + /** + * Handles scan results. + */ + private final ScanResultHandler resultHandler; + + private final Statistics stats = new Statistics(); + + static class Statistics { + long bytesScannedInPastHour = 0; + long blocksScannedInCurrentPeriod = 0; + long blocksScannedSinceRestart = 0; + long scansSinceRestart = 0; + long scanErrorsSinceRestart = 0; + long nextBlockPoolScanStartMs = -1; + long blockPoolPeriodEndsMs = -1; + ExtendedBlock lastBlockScanned = null; + boolean eof = false; + + Statistics() { + } + + Statistics(Statistics other) { + this.bytesScannedInPastHour = other.bytesScannedInPastHour; + this.blocksScannedInCurrentPeriod = other.blocksScannedInCurrentPeriod; + this.blocksScannedSinceRestart = other.blocksScannedSinceRestart; + this.scansSinceRestart = other.scansSinceRestart; + this.scanErrorsSinceRestart = other.scanErrorsSinceRestart; + this.nextBlockPoolScanStartMs = other.nextBlockPoolScanStartMs; + this.blockPoolPeriodEndsMs = other.blockPoolPeriodEndsMs; + this.lastBlockScanned = other.lastBlockScanned; + this.eof = other.eof; + } + + @Override + public String toString() { + return new StringBuilder(). + append("Statistics{"). + append("bytesScannedInPastHour=").append(bytesScannedInPastHour). + append(", blocksScannedInCurrentPeriod="). + append(blocksScannedInCurrentPeriod). + append(", blocksScannedSinceRestart="). + append(blocksScannedSinceRestart). + append(", scansSinceRestart=").append(scansSinceRestart). + append(", scanErrorsSinceRestart=").append(scanErrorsSinceRestart). + append(", nextBlockPoolScanStartMs=").append(nextBlockPoolScanStartMs). + append(", blockPoolPeriodEndsMs=").append(blockPoolPeriodEndsMs). + append(", lastBlockScanned=").append(lastBlockScanned). + append(", eof=").append(eof). + append("}").toString(); + } + } + + private static double positiveMsToHours(long ms) { + if (ms <= 0) { + return 0; + } else { + return TimeUnit.HOURS.convert(ms, TimeUnit.MILLISECONDS); + } + } + + public void printStats(StringBuilder p) { + p.append("Block scanner information for volume " + + volume.getStorageID() + " with base path " + volume.getBasePath() + + "%n"); + synchronized (stats) { + p.append(String.format("Bytes verified in last hour : %57d%n", + stats.bytesScannedInPastHour)); + p.append(String.format("Blocks scanned in current period : %57d%n", + stats.blocksScannedInCurrentPeriod)); + p.append(String.format("Blocks scanned since restart : %57d%n", + stats.blocksScannedSinceRestart)); + p.append(String.format("Block pool scans since restart : %57d%n", + stats.scansSinceRestart)); + p.append(String.format("Block scan errors since restart : %57d%n", + stats.scanErrorsSinceRestart)); + if (stats.nextBlockPoolScanStartMs > 0) { + p.append(String.format("Hours until next block pool scan : %57.3f%n", + positiveMsToHours(stats.nextBlockPoolScanStartMs - + Time.monotonicNow()))); + } + if (stats.blockPoolPeriodEndsMs > 0) { + p.append(String.format("Hours until possible pool rescan : %57.3f%n", + positiveMsToHours(stats.blockPoolPeriodEndsMs - + Time.now()))); + } + p.append(String.format("Last block scanned : %57s%n", + ((stats.lastBlockScanned == null) ? "none" : + stats.lastBlockScanned.toString()))); + p.append(String.format("More blocks to scan in period : %57s%n", + !stats.eof)); + p.append("%n"); + } + } + + static class ScanResultHandler { + private VolumeScanner scanner; + + public void setup(VolumeScanner scanner) { + LOG.trace("Starting VolumeScanner {}", + scanner.volume.getBasePath()); + this.scanner = scanner; + } + + public void handle(ExtendedBlock block, IOException e) { + FsVolumeSpi volume = scanner.volume; + if (e == null) { + LOG.trace("Successfully scanned {} on {}", block, volume.getBasePath()); + return; + } + // If the block does not exist anymore, then it's not an error. + if (!volume.getDataset().contains(block)) { + LOG.debug("Volume {}: block {} is no longer in the dataset.", + volume.getBasePath(), block); + return; + } + // If the block exists, the exception may due to a race with write: + // The BlockSender got an old block path in rbw. BlockReceiver removed + // the rbw block from rbw to finalized but BlockSender tried to open the + // file before BlockReceiver updated the VolumeMap. The state of the + // block can be changed again now, so ignore this error here. If there + // is a block really deleted by mistake, DirectoryScan should catch it. + if (e instanceof FileNotFoundException ) { + LOG.info("Volume {}: verification failed for {} because of " + + "FileNotFoundException. This may be due to a race with write.", + volume.getBasePath(), block); + return; + } + LOG.warn("Reporting bad {} on {}", block, volume.getBasePath()); + try { + scanner.datanode.reportBadBlocks(block); + } catch (IOException ie) { + // This is bad, but not bad enough to shut down the scanner. + LOG.warn("Cannot report bad " + block.getBlockId(), e); + } + } + } + + VolumeScanner(Conf conf, DataNode datanode, FsVolumeReference ref) { + this.conf = conf; + this.datanode = datanode; + this.ref = ref; + this.volume = ref.getVolume(); + ScanResultHandler handler; + try { + handler = conf.resultHandler.newInstance(); + } catch (Throwable e) { + LOG.error("unable to instantiate {}", conf.resultHandler, e); + handler = new ScanResultHandler(); + } + this.resultHandler = handler; + setName("VolumeScannerThread(" + volume.getBasePath() + ")"); + setDaemon(true); + } + + private void saveBlockIterator(BlockIterator iter) { + try { + iter.save(); + } catch (IOException e) { + LOG.warn("{}: error saving {}.", this, iter, e); + } + } + + private void expireOldScannedBytesRecords(long monotonicMs) { + long newMinute = + TimeUnit.MINUTES.convert(monotonicMs, TimeUnit.MILLISECONDS); + newMinute = newMinute % MINUTES_PER_HOUR; + if (curMinute == newMinute) { + return; + } + // If a minute or more has gone past since we last updated the scannedBytes + // array, zero out the slots corresponding to those minutes. + for (long m = curMinute + 1; m <= newMinute; m++) { + LOG.trace("{}: updateScannedBytes is zeroing out slot {}. " + + "curMinute = {}; newMinute = {}", this, m % MINUTES_PER_HOUR, + curMinute, newMinute); + scannedBytesSum -= scannedBytes[(int)(m % MINUTES_PER_HOUR)]; + scannedBytes[(int)(m % MINUTES_PER_HOUR)] = 0; + } + curMinute = newMinute; + } + + /** + * Find a usable block iterator.<p/> + * + * We will consider available block iterators in order. This property is + * important so that we don't keep rescanning the same block pool id over + * and over, while other block pools stay unscanned.<p/> + * + * A block pool is always ready to scan if the iterator is not at EOF. If + * the iterator is at EOF, the block pool will be ready to scan when + * conf.scanPeriodMs milliseconds have elapsed since the iterator was last + * rewound.<p/> + * + * @return 0 if we found a usable block iterator; the + * length of time we should delay before + * checking again otherwise. + */ + private synchronized long findNextUsableBlockIter() { + int numBlockIters = blockIters.size(); + if (numBlockIters == 0) { + LOG.debug("{}: no block pools are registered.", this); + return Long.MAX_VALUE; + } + int curIdx; + if (curBlockIter == null) { + curIdx = 0; + } else { + curIdx = blockIters.indexOf(curBlockIter); + Preconditions.checkState(curIdx >= 0); + } + // Note that this has to be wall-clock time, not monotonic time. This is + // because the time saved in the cursor file is a wall-clock time. We do + // not want to save a monotonic time in the cursor file, because it resets + // every time the machine reboots (on most platforms). + long nowMs = Time.now(); + long minTimeoutMs = Long.MAX_VALUE; + for (int i = 0; i < numBlockIters; i++) { + int idx = (curIdx + i + 1) % numBlockIters; + BlockIterator iter = blockIters.get(idx); + if (!iter.atEnd()) { + LOG.info("Now scanning bpid {} on volume {}", + iter.getBlockPoolId(), volume.getBasePath()); + curBlockIter = iter; + return 0L; + } + long iterStartMs = iter.getIterStartMs(); + long waitMs = (iterStartMs + conf.scanPeriodMs) - nowMs; + if (waitMs <= 0) { + iter.rewind(); + LOG.info("Now rescanning bpid {} on volume {}, after more than " + + "{} hour(s)", iter.getBlockPoolId(), volume.getBasePath(), + TimeUnit.HOURS.convert(conf.scanPeriodMs, TimeUnit.MILLISECONDS)); + curBlockIter = iter; + return 0L; + } + minTimeoutMs = Math.min(minTimeoutMs, waitMs); + } + LOG.info("{}: no suitable block pools found to scan. Waiting {} ms.", + this, minTimeoutMs); + return minTimeoutMs; + } + + /** + * Scan a block. + * + * @param cblock The block to scan. + * @param bytesPerSec The bytes per second to scan at. + * + * @return The length of the block that was scanned, or + * -1 if the block could not be scanned. + */ + private long scanBlock(ExtendedBlock cblock, long bytesPerSec) { + // 'cblock' has a valid blockId and block pool id, but we don't yet know the + // genstamp the block is supposed to have. Ask the FsDatasetImpl for this + // information. + ExtendedBlock block = null; + try { + Block b = volume.getDataset().getStoredBlock( + cblock.getBlockPoolId(), cblock.getBlockId()); + if (b == null) { + LOG.info("FileNotFound while finding block {} on volume {}", + cblock, volume.getBasePath()); + } else { + block = new ExtendedBlock(cblock.getBlockPoolId(), b); + } + } catch (FileNotFoundException e) { + LOG.info("FileNotFoundException while finding block {} on volume {}", + cblock, volume.getBasePath()); + } catch (IOException e) { + LOG.warn("I/O error while finding block {} on volume {}", + cblock, volume.getBasePath()); + } + if (block == null) { + return -1; // block not found. + } + BlockSender blockSender = null; + try { + blockSender = new BlockSender(block, 0, -1, + false, true, true, datanode, null, + CachingStrategy.newDropBehind()); + throttler.setBandwidth(bytesPerSec); + long bytesRead = blockSender.sendBlock(nullStream, null, throttler); + resultHandler.handle(block, null); + return bytesRead; + } catch (IOException e) { + resultHandler.handle(block, e); + } finally { + IOUtils.cleanup(null, blockSender); + } + return -1; + } + + @VisibleForTesting + static boolean calculateShouldScan(long targetBytesPerSec, + long scannedBytesSum) { + long effectiveBytesPerSec = + scannedBytesSum / (SECONDS_PER_MINUTE * MINUTES_PER_HOUR); + boolean shouldScan = effectiveBytesPerSec <= targetBytesPerSec; + LOG.trace("calculateShouldScan: effectiveBytesPerSec = {}, and " + + "targetBytesPerSec = {}. shouldScan = {}", + effectiveBytesPerSec, targetBytesPerSec, shouldScan); + return shouldScan; + } + + /** + * Run an iteration of the VolumeScanner loop. + * + * @return The number of milliseconds to delay before running the loop + * again, or 0 to re-run the loop immediately. + */ + private long runLoop() { + long bytesScanned = -1; + boolean scanError = false; + ExtendedBlock block = null; + try { + long monotonicMs = Time.monotonicNow(); + expireOldScannedBytesRecords(monotonicMs); + + if (!calculateShouldScan(conf.targetBytesPerSec, scannedBytesSum)) { + // If neededBytesPerSec is too low, then wait few seconds for some old + // scannedBytes records to expire. + return 30000L; + } + + // Find a usable block pool to scan. + if ((curBlockIter == null) || curBlockIter.atEnd()) { + long timeout = findNextUsableBlockIter(); + if (timeout > 0) { + LOG.trace("{}: no block pools are ready to scan yet. Waiting " + + "{} ms.", this, timeout); + synchronized (stats) { + stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout; + } + return timeout; + } + synchronized (stats) { + stats.scansSinceRestart++; + stats.blocksScannedInCurrentPeriod = 0; + stats.nextBlockPoolScanStartMs = -1; + } + return 0L; + } + + try { + block = curBlockIter.nextBlock(); + } catch (IOException e) { + // There was an error listing the next block in the volume. This is a + // serious issue. + LOG.warn("{}: nextBlock error on {}", this, curBlockIter); + // On the next loop iteration, curBlockIter#eof will be set to true, and + // we will pick a different block iterator. + return 0L; + } + if (block == null) { + // The BlockIterator is at EOF. + LOG.info("{}: finished scanning block pool {}", + this, curBlockIter.getBlockPoolId()); + saveBlockIterator(curBlockIter); + return 0; + } + long saveDelta = monotonicMs - curBlockIter.getLastSavedMs(); + if (saveDelta >= conf.cursorSaveMs) { + LOG.debug("{}: saving block iterator {} after {} ms.", + this, curBlockIter, saveDelta); + saveBlockIterator(curBlockIter); + } + bytesScanned = scanBlock(block, conf.targetBytesPerSec); + if (bytesScanned >= 0) { + scannedBytesSum += bytesScanned; + scannedBytes[(int)(curMinute % MINUTES_PER_HOUR)] += bytesScanned; + } else { + scanError = true; + } + return 0L; + } finally { + synchronized (stats) { + stats.bytesScannedInPastHour = scannedBytesSum; + if (bytesScanned >= 0) { + stats.blocksScannedInCurrentPeriod++; + stats.blocksScannedSinceRestart++; + } + if (scanError) { + stats.scanErrorsSinceRestart++; + } + if (block != null) { + stats.lastBlockScanned = block; + } + if (curBlockIter == null) { + stats.eof = true; + stats.blockPoolPeriodEndsMs = -1; + } else { + stats.eof = curBlockIter.atEnd(); + stats.blockPoolPeriodEndsMs = + curBlockIter.getIterStartMs() + conf.scanPeriodMs; + } + } + } + } + + @Override + public void run() { + try { + LOG.trace("{}: thread starting.", this); + resultHandler.setup(this); + try { + long timeout = 0; + while (true) { + // Take the lock to check if we should stop. + synchronized (this) { + if (stopping) { + break; + } + if (timeout > 0) { + wait(timeout); + if (stopping) { + break; + } + } + } + timeout = runLoop(); + } + } catch (InterruptedException e) { + // We are exiting because of an InterruptedException, + // probably sent by VolumeScanner#shutdown. + LOG.trace("{} exiting because of InterruptedException.", this); + } catch (Throwable e) { + LOG.error("{} exiting because of exception ", this, e); + } + LOG.info("{} exiting.", this); + // Save the current position of all block iterators and close them. + for (BlockIterator iter : blockIters) { + saveBlockIterator(iter); + IOUtils.cleanup(null, iter); + } + } finally { + // When the VolumeScanner exits, release the reference we were holding + // on the volume. This will allow the volume to be removed later. + IOUtils.cleanup(null, ref); + } + } + + @Override + public String toString() { + return "VolumeScanner(" + volume.getBasePath() + + ", " + volume.getStorageID() + ")"; + } + + /** + * Shut down this scanner. + */ + public synchronized void shutdown() { + stopping = true; + notify(); + this.interrupt(); + } + + /** + * Allow the scanner to scan the given block pool. + * + * @param bpid The block pool id. + */ + public synchronized void enableBlockPoolId(String bpid) { + for (BlockIterator iter : blockIters) { + if (iter.getBlockPoolId().equals(bpid)) { + LOG.warn("{}: already enabled scanning on block pool {}", this, bpid); + return; + } + } + BlockIterator iter = null; + try { + // Load a block iterator for the next block pool on the volume. + iter = volume.loadBlockIterator(bpid, BLOCK_ITERATOR_NAME); + LOG.trace("{}: loaded block iterator for {}.", this, bpid); + } catch (FileNotFoundException e) { + LOG.debug("{}: failed to load block iterator: " + e.getMessage(), this); + } catch (IOException e) { + LOG.warn("{}: failed to load block iterator.", this, e); + } + if (iter == null) { + iter = volume.newBlockIterator(bpid, BLOCK_ITERATOR_NAME); + LOG.trace("{}: created new block iterator for {}.", this, bpid); + } + iter.setMaxStalenessMs(conf.maxStalenessMs); + blockIters.add(iter); + notify(); + } + + /** + * Disallow the scanner from scanning the given block pool. + * + * @param bpid The block pool id. + */ + public synchronized void disableBlockPoolId(String bpid) { + Iterator<BlockIterator> i = blockIters.iterator(); + while (i.hasNext()) { + BlockIterator iter = i.next(); + if (iter.getBlockPoolId().equals(bpid)) { + LOG.trace("{}: disabling scanning on block pool {}", this, bpid); + i.remove(); + IOUtils.cleanup(null, iter); + if (curBlockIter == iter) { + curBlockIter = null; + } + notify(); + return; + } + } + LOG.warn("{}: can't remove block pool {}, because it was never " + + "added.", this, bpid); + } + + @VisibleForTesting + Statistics getStatistics() { + synchronized (stats) { + return new Statistics(stats); + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 0d5de81..162e306 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -90,24 +90,30 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { } } - /** - * Create rolling logs. - * - * @param prefix the prefix of the log names. - * @return rolling logs - */ - public RollingLogs createRollingLogs(String bpid, String prefix - ) throws IOException; - /** @return a list of volumes. */ public List<V> getVolumes(); - /** Add an array of StorageLocation to FsDataset. */ + /** + * Add a new volume to the FsDataset.<p/> + * + * If the FSDataset supports block scanning, this function registers + * the new volume with the block scanner. + * + * @param location The storage location for the new volume. + * @param nsInfos Namespace information for the new volume. + */ public void addVolume( final StorageLocation location, final List<NamespaceInfo> nsInfos) throws IOException; - /** Removes a collection of volumes from FsDataset. */ + /** + * Removes a collection of volumes from FsDataset. + * + * If the FSDataset supports block scanning, this function removes + * the volumes from the block scanner. + * + * @param volumes The storage locations of the volumes to remove. + */ public void removeVolumes(Collection<StorageLocation> volumes); /** @return a storage with the given storage ID */ @@ -514,6 +520,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { /** * Move block from one storage to another storage */ - public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block, + public ReplicaInfo moveBlockAcrossStorage(final ExtendedBlock block, StorageType targetStorageType) throws IOException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java index 3a635b7..1355e31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.channels.ClosedChannelException; import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; /** * This is an interface for the underlying volume. @@ -69,4 +71,112 @@ public interface FsVolumeSpi { * Release disk space previously reserved for RBW block. */ public void releaseReservedSpace(long bytesToRelease); + + /** + * BlockIterator will return ExtendedBlock entries from a block pool in + * this volume. The entries will be returned in sorted order.<p/> + * + * BlockIterator objects themselves do not always have internal + * synchronization, so they can only safely be used by a single thread at a + * time.<p/> + * + * Closing the iterator does not save it. You must call save to save it. + */ + public interface BlockIterator extends Closeable { + /** + * Get the next block.<p/> + * + * Note that this block may be removed in between the time we list it, + * and the time the caller tries to use it, or it may represent a stale + * entry. Callers should handle the case where the returned block no + * longer exists. + * + * @return The next block, or null if there are no + * more blocks. Null if there was an error + * determining the next block. + * + * @throws IOException If there was an error getting the next block in + * this volume. In this case, EOF will be set on + * the iterator. + */ + public ExtendedBlock nextBlock() throws IOException; + + /** + * Returns true if we got to the end of the block pool. + */ + public boolean atEnd(); + + /** + * Repositions the iterator at the beginning of the block pool. + */ + public void rewind(); + + /** + * Save this block iterator to the underlying volume. + * Any existing saved block iterator with this name will be overwritten. + * maxStalenessMs will not be saved. + * + * @throws IOException If there was an error when saving the block + * iterator. + */ + public void save() throws IOException; + + /** + * Set the maximum staleness of entries that we will return.<p/> + * + * A maximum staleness of 0 means we will never return stale entries; a + * larger value will allow us to reduce resource consumption in exchange + * for returning more potentially stale entries. Even with staleness set + * to 0, consumers of this API must handle race conditions where block + * disappear before they can be processed. + */ + public void setMaxStalenessMs(long maxStalenessMs); + + /** + * Get the wall-clock time, measured in milliseconds since the Epoch, + * when this iterator was created. + */ + public long getIterStartMs(); + + /** + * Get the wall-clock time, measured in milliseconds since the Epoch, + * when this iterator was last saved. Returns iterStartMs if the + * iterator was never saved. + */ + public long getLastSavedMs(); + + /** + * Get the id of the block pool which this iterator traverses. + */ + public String getBlockPoolId(); + } + + /** + * Create a new block iterator. It will start at the beginning of the + * block set. + * + * @param bpid The block pool id to iterate over. + * @param name The name of the block iterator to create. + * + * @return The new block iterator. + */ + public BlockIterator newBlockIterator(String bpid, String name); + + /** + * Load a saved block iterator. + * + * @param bpid The block pool id to iterate over. + * @param name The name of the block iterator to load. + * + * @return The saved block iterator. + * @throws IOException If there was an IO error loading the saved + * block iterator. + */ + public BlockIterator loadBlockIterator(String bpid, String name) + throws IOException; + + /** + * Get the FSDatasetSpi which this volume is a part of. + */ + public FsDatasetSpi getDataset(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java deleted file mode 100644 index 5d54770..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode.fsdataset; - -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; - -/** - * Rolling logs consist of a current log and a set of previous logs. - * - * The implementation should support a single appender and multiple readers. - */ -public interface RollingLogs { - /** - * To iterate the lines of the logs. - */ - public interface LineIterator extends Iterator<String>, Closeable { - /** Is the iterator iterating the previous? */ - public boolean isPrevious(); - - /** - * Is the last read entry from previous? This should be called after - * reading. - */ - public boolean isLastReadFromPrevious(); - } - - /** - * To append text to the logs. - */ - public interface Appender extends Appendable, Closeable { - } - - /** - * Create an iterator to iterate the lines in the logs. - * - * @param skipPrevious Should it skip reading the previous log? - * @return a new iterator. - */ - public LineIterator iterator(boolean skipPrevious) throws IOException; - - /** - * @return the only appender to append text to the logs. - * The same object is returned if it is invoked multiple times. - */ - public Appender appender(); - - /** - * Roll current to previous. - * - * @return true if the rolling succeeded. - * When it returns false, it is not equivalent to an error. - * It means that the rolling cannot be performed at the moment, - * e.g. the logs are being read. - */ - public boolean roll() throws IOException; -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index f990faf..c00d467 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -67,7 +67,7 @@ import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; -import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; +import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; @@ -89,7 +89,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; @@ -284,7 +283,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); - volumes = new FsVolumeList(volsFailed, blockChooserImpl); + volumes = new FsVolumeList(volsFailed, datanode.getBlockScanner(), + blockChooserImpl); asyncDiskService = new FsDatasetAsyncDiskService(datanode); asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode); @@ -312,6 +312,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // storageMap and asyncDiskService, consistent. FsVolumeImpl fsVolume = new FsVolumeImpl( this, sd.getStorageUuid(), dir, this.conf, storageType); + FsVolumeReference ref = fsVolume.obtainReference(); ReplicaMap tempVolumeMap = new ReplicaMap(this); fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker); @@ -322,7 +323,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { DatanodeStorage.State.NORMAL, storageType)); asyncDiskService.addVolume(sd.getCurrentDir()); - volumes.addVolume(fsVolume); + volumes.addVolume(ref); } LOG.info("Added volume - " + dir + ", StorageType: " + storageType); @@ -361,6 +362,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { throw MultipleIOException.createIOException(exceptions); } + final FsVolumeReference ref = fsVolume.obtainReference(); setupAsyncLazyPersistThread(fsVolume); builder.build(); @@ -371,7 +373,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { DatanodeStorage.State.NORMAL, storageType)); asyncDiskService.addVolume(sd.getCurrentDir()); - volumes.addVolume(fsVolume); + volumes.addVolume(ref); } LOG.info("Added volume - " + dir + ", StorageType: " + storageType); } @@ -415,9 +417,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { it.remove(); } } - // Delete blocks from the block scanner in batch. - datanode.getBlockScanner().deleteBlocks(bpid, - blocks.toArray(new Block[blocks.size()])); } storageMap.remove(sd.getStorageUuid()); @@ -771,7 +770,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { } // Replace the old block if any to reschedule the scanning. - datanode.getBlockScanner().addBlock(block, false); return replicaInfo; } @@ -2006,10 +2004,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { // Block is in memory and not on the disk // Remove the block from volumeMap volumeMap.remove(bpid, blockId); - final DataBlockScanner blockScanner = datanode.getBlockScanner(); - if (blockScanner != null) { - blockScanner.deleteBlock(bpid, new Block(blockId)); - } if (vol.isTransientStorage()) { ramDiskReplicaTracker.discardReplica(bpid, blockId, true); } @@ -2032,12 +2026,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, diskFile.length(), diskGS, vol, diskFile.getParentFile()); volumeMap.add(bpid, diskBlockInfo); - final DataBlockScanner blockScanner = datanode.getBlockScanner(); - if (!vol.isTransientStorage()) { - if (blockScanner != null) { - blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo), false); - } - } else { + if (vol.isTransientStorage()) { ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol); } LOG.warn("Added missing block to memory " + diskBlockInfo); @@ -2540,23 +2529,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { dataStorage.clearRollingUpgradeMarker(bpid); } - @Override - public RollingLogs createRollingLogs(String bpid, String prefix - ) throws IOException { - String dir = null; - final List<FsVolumeImpl> volumes = getVolumes(); - for (FsVolumeImpl vol : volumes) { - String bpDir = vol.getPath(bpid); - if (RollingLogsImpl.isFilePresent(bpDir, prefix)) { - dir = bpDir; - break; - } - } - if (dir == null) { - dir = volumes.get(0).getPath(bpid); - } - return new RollingLogsImpl(dir, prefix); - } @Override public void onCompleteLazyPersist(String bpId, long blockId, http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 7c8384d..5ce2710 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -17,9 +17,18 @@ */ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; +import java.io.BufferedWriter; import java.io.File; +import java.io.FileOutputStream; +import java.io.FilenameFilter; import java.io.IOException; import java.nio.channels.ClosedChannelException; +import java.io.OutputStreamWriter; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; @@ -41,15 +50,24 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.util.CloseableReferenceCount; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.util.Time; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.ObjectReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The underlying volume used to store replica. @@ -59,6 +77,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; @InterfaceAudience.Private @VisibleForTesting public class FsVolumeImpl implements FsVolumeSpi { + public static final Logger LOG = + LoggerFactory.getLogger(FsVolumeImpl.class); + private final FsDatasetImpl dataset; private final String storageID; private final StorageType storageType; @@ -395,6 +416,332 @@ public class FsVolumeImpl implements FsVolumeSpi { } } + private enum SubdirFilter implements FilenameFilter { + INSTANCE; + + @Override + public boolean accept(File dir, String name) { + return name.startsWith("subdir"); + } + } + + private enum BlockFileFilter implements FilenameFilter { + INSTANCE; + + @Override + public boolean accept(File dir, String name) { + return !name.endsWith(".meta") && name.startsWith("blk_"); + } + } + + @VisibleForTesting + public static String nextSorted(List<String> arr, String prev) { + int res = 0; + if (prev != null) { + res = Collections.binarySearch(arr, prev); + if (res < 0) { + res = -1 - res; + } else { + res++; + } + } + if (res >= arr.size()) { + return null; + } + return arr.get(res); + } + + private static class BlockIteratorState { + BlockIteratorState() { + lastSavedMs = iterStartMs = Time.now(); + curFinalizedDir = null; + curFinalizedSubDir = null; + curEntry = null; + atEnd = false; + } + + // The wall-clock ms since the epoch at which this iterator was last saved. + @JsonProperty + private long lastSavedMs; + + // The wall-clock ms since the epoch at which this iterator was created. + @JsonProperty + private long iterStartMs; + + @JsonProperty + private String curFinalizedDir; + + @JsonProperty + private String curFinalizedSubDir; + + @JsonProperty + private String curEntry; + + @JsonProperty + private boolean atEnd; + } + + /** + * A BlockIterator implementation for FsVolumeImpl. + */ + private class BlockIteratorImpl implements FsVolumeSpi.BlockIterator { + private final File bpidDir; + private final String name; + private final String bpid; + private long maxStalenessMs = 0; + + private List<String> cache; + private long cacheMs; + + private BlockIteratorState state; + + BlockIteratorImpl(String bpid, String name) { + this.bpidDir = new File(currentDir, bpid); + this.name = name; + this.bpid = bpid; + rewind(); + } + + /** + * Get the next subdirectory within the block pool slice. + * + * @return The next subdirectory within the block pool slice, or + * null if there are no more. + */ + private String getNextSubDir(String prev, File dir) + throws IOException { + List<String> children = + IOUtils.listDirectory(dir, SubdirFilter.INSTANCE); + cache = null; + cacheMs = 0; + if (children.size() == 0) { + LOG.trace("getNextSubDir({}, {}): no subdirectories found in {}", + storageID, bpid, dir.getAbsolutePath()); + return null; + } + Collections.sort(children); + String nextSubDir = nextSorted(children, prev); + if (nextSubDir == null) { + LOG.trace("getNextSubDir({}, {}): no more subdirectories found in {}", + storageID, bpid, dir.getAbsolutePath()); + } else { + LOG.trace("getNextSubDir({}, {}): picking next subdirectory {} " + + "within {}", storageID, bpid, nextSubDir, dir.getAbsolutePath()); + } + return nextSubDir; + } + + private String getNextFinalizedDir() throws IOException { + File dir = Paths.get( + bpidDir.getAbsolutePath(), "current", "finalized").toFile(); + return getNextSubDir(state.curFinalizedDir, dir); + } + + private String getNextFinalizedSubDir() throws IOException { + if (state.curFinalizedDir == null) { + return null; + } + File dir = Paths.get( + bpidDir.getAbsolutePath(), "current", "finalized", + state.curFinalizedDir).toFile(); + return getNextSubDir(state.curFinalizedSubDir, dir); + } + + private List<String> getSubdirEntries() throws IOException { + if (state.curFinalizedSubDir == null) { + return null; // There are no entries in the null subdir. + } + long now = Time.monotonicNow(); + if (cache != null) { + long delta = now - cacheMs; + if (delta < maxStalenessMs) { + return cache; + } else { + LOG.trace("getSubdirEntries({}, {}): purging entries cache for {} " + + "after {} ms.", storageID, bpid, state.curFinalizedSubDir, delta); + cache = null; + } + } + File dir = Paths.get(bpidDir.getAbsolutePath(), "current", "finalized", + state.curFinalizedDir, state.curFinalizedSubDir).toFile(); + List<String> entries = + IOUtils.listDirectory(dir, BlockFileFilter.INSTANCE); + if (entries.size() == 0) { + entries = null; + } else { + Collections.sort(entries); + } + if (entries == null) { + LOG.trace("getSubdirEntries({}, {}): no entries found in {}", + storageID, bpid, dir.getAbsolutePath()); + } else { + LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}", + storageID, bpid, entries.size(), dir.getAbsolutePath()); + } + cache = entries; + cacheMs = now; + return cache; + } + + /** + * Get the next block.<p/> + * + * Each volume has a hierarchical structure.<p/> + * + * <code> + * BPID B0 + * finalized/ + * subdir0 + * subdir0 + * blk_000 + * blk_001 + * ... + * subdir1 + * subdir0 + * ... + * rbw/ + * </code> + * + * When we run out of entries at one level of the structure, we search + * progressively higher levels. For example, when we run out of blk_ + * entries in a subdirectory, we search for the next subdirectory. + * And so on. + */ + @Override + public ExtendedBlock nextBlock() throws IOException { + if (state.atEnd) { + return null; + } + try { + while (true) { + List<String> entries = getSubdirEntries(); + if (entries != null) { + state.curEntry = nextSorted(entries, state.curEntry); + if (state.curEntry == null) { + LOG.trace("nextBlock({}, {}): advancing from {} to next " + + "subdirectory.", storageID, bpid, state.curFinalizedSubDir); + } else { + ExtendedBlock block = + new ExtendedBlock(bpid, Block.filename2id(state.curEntry)); + LOG.trace("nextBlock({}, {}): advancing to {}", + storageID, bpid, block); + return block; + } + } + state.curFinalizedSubDir = getNextFinalizedSubDir(); + if (state.curFinalizedSubDir == null) { + state.curFinalizedDir = getNextFinalizedDir(); + if (state.curFinalizedDir == null) { + state.atEnd = true; + return null; + } + } + } + } catch (IOException e) { + state.atEnd = true; + LOG.error("nextBlock({}, {}): I/O error", storageID, bpid, e); + throw e; + } + } + + @Override + public boolean atEnd() { + return state.atEnd; + } + + @Override + public void rewind() { + cache = null; + cacheMs = 0; + state = new BlockIteratorState(); + } + + @Override + public void save() throws IOException { + state.lastSavedMs = Time.now(); + boolean success = false; + ObjectMapper mapper = new ObjectMapper(); + try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter( + new FileOutputStream(getTempSaveFile(), false), "UTF-8"))) { + mapper.writerWithDefaultPrettyPrinter().writeValue(writer, state); + success = true; + } finally { + if (!success) { + if (getTempSaveFile().delete()) { + LOG.debug("save({}, {}): error deleting temporary file.", + storageID, bpid); + } + } + } + Files.move(getTempSaveFile().toPath(), getSaveFile().toPath(), + StandardCopyOption.ATOMIC_MOVE); + if (LOG.isTraceEnabled()) { + LOG.trace("save({}, {}): saved {}", storageID, bpid, + mapper.writerWithDefaultPrettyPrinter().writeValueAsString(state)); + } + } + + public void load() throws IOException { + ObjectMapper mapper = new ObjectMapper(); + File file = getSaveFile(); + this.state = mapper.reader(BlockIteratorState.class).readValue(file); + LOG.trace("load({}, {}): loaded iterator {} from {}: {}", storageID, + bpid, name, file.getAbsoluteFile(), + mapper.writerWithDefaultPrettyPrinter().writeValueAsString(state)); + } + + File getSaveFile() { + return new File(bpidDir, name + ".cursor"); + } + + File getTempSaveFile() { + return new File(bpidDir, name + ".cursor.tmp"); + } + + @Override + public void setMaxStalenessMs(long maxStalenessMs) { + this.maxStalenessMs = maxStalenessMs; + } + + @Override + public void close() throws IOException { + // No action needed for this volume implementation. + } + + @Override + public long getIterStartMs() { + return state.iterStartMs; + } + + @Override + public long getLastSavedMs() { + return state.lastSavedMs; + } + + @Override + public String getBlockPoolId() { + return bpid; + } + } + + @Override + public BlockIterator newBlockIterator(String bpid, String name) { + return new BlockIteratorImpl(bpid, name); + } + + @Override + public BlockIterator loadBlockIterator(String bpid, String name) + throws IOException { + BlockIteratorImpl iter = new BlockIteratorImpl(bpid, name); + iter.load(); + return iter; + } + + @Override + public FsDatasetSpi getDataset() { + return dataset; + } + /** * RBW files. They get moved to the finalized block directory when * the block is finalized. http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index c837593..ae2f5b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; +import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.Time; @@ -42,11 +43,13 @@ class FsVolumeList { private Object checkDirsMutex = new Object(); private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser; + private final BlockScanner blockScanner; private volatile int numFailedVolumes; - FsVolumeList(int failedVols, + FsVolumeList(int failedVols, BlockScanner blockScanner, VolumeChoosingPolicy<FsVolumeImpl> blockChooser) { this.blockChooser = blockChooser; + this.blockScanner = blockScanner; this.numFailedVolumes = failedVols; } @@ -260,13 +263,14 @@ class FsVolumeList { /** * Dynamically add new volumes to the existing volumes that this DN manages. - * @param newVolume the instance of new FsVolumeImpl. + * + * @param ref a reference to the new FsVolumeImpl instance. */ - void addVolume(FsVolumeImpl newVolume) { + void addVolume(FsVolumeReference ref) { while (true) { final FsVolumeImpl[] curVolumes = volumes.get(); final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes); - volumeList.add(newVolume); + volumeList.add((FsVolumeImpl)ref.getVolume()); if (volumes.compareAndSet(curVolumes, volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) { break; @@ -274,12 +278,15 @@ class FsVolumeList { if (FsDatasetImpl.LOG.isDebugEnabled()) { FsDatasetImpl.LOG.debug( "The volume list has been changed concurrently, " + - "retry to remove volume: " + newVolume); + "retry to remove volume: " + ref.getVolume().getStorageID()); } } } - - FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString()); + if (blockScanner != null) { + blockScanner.addVolumeScanner(ref); + } + FsDatasetImpl.LOG.info("Added new volume: " + + ref.getVolume().getStorageID()); } /** @@ -293,6 +300,9 @@ class FsVolumeList { if (volumeList.remove(target)) { if (volumes.compareAndSet(curVolumes, volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) { + if (blockScanner != null) { + blockScanner.removeVolumeScanner(target); + } try { target.closeAndWait(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java deleted file mode 100644 index 121127d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java +++ /dev/null @@ -1,241 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; - -import com.google.common.base.Charsets; - -class RollingLogsImpl implements RollingLogs { - private static final String CURR_SUFFIX = ".curr"; - private static final String PREV_SUFFIX = ".prev"; - - static boolean isFilePresent(String dir, String filePrefix) { - return new File(dir, filePrefix + CURR_SUFFIX).exists() || - new File(dir, filePrefix + PREV_SUFFIX).exists(); - } - - private final File curr; - private final File prev; - private PrintWriter out; //require synchronized access - - private final Appender appender = new Appender() { - @Override - public Appendable append(CharSequence csq) { - synchronized(RollingLogsImpl.this) { - if (out == null) { - throw new IllegalStateException(RollingLogsImpl.this - + " is not yet opened."); - } - out.print(csq); - out.flush(); - } - return this; - } - - @Override - public Appendable append(char c) { - throw new UnsupportedOperationException(); - } - - @Override - public Appendable append(CharSequence csq, int start, int end) { - throw new UnsupportedOperationException(); - } - - @Override - public void close() { - synchronized(RollingLogsImpl.this) { - if (out != null) { - out.close(); - out = null; - } - } - } - }; - - - private final AtomicInteger numReaders = new AtomicInteger(); - - RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{ - curr = new File(dir, filePrefix + CURR_SUFFIX); - prev = new File(dir, filePrefix + PREV_SUFFIX); - out = new PrintWriter(new OutputStreamWriter(new FileOutputStream( - curr, true), Charsets.UTF_8)); - } - - @Override - public Reader iterator(boolean skipPrevFile) throws IOException { - numReaders.incrementAndGet(); - return new Reader(skipPrevFile); - } - - @Override - public Appender appender() { - return appender; - } - - @Override - public boolean roll() throws IOException { - if (numReaders.get() > 0) { - return false; - } - if (!prev.delete() && prev.exists()) { - throw new IOException("Failed to delete " + prev); - } - - synchronized(this) { - appender.close(); - final boolean renamed = curr.renameTo(prev); - out = new PrintWriter(new OutputStreamWriter(new FileOutputStream( - curr, true), Charsets.UTF_8)); - if (!renamed) { - throw new IOException("Failed to rename " + curr + " to " + prev); - } - } - return true; - } - - @Override - public String toString() { - return curr.toString(); - } - - /** - * This is used to read the lines in order. - * If the data is not read completely (i.e, untill hasNext() returns - * false), it needs to be explicitly - */ - private class Reader implements RollingLogs.LineIterator { - private File file; - private File lastReadFile; - private BufferedReader reader; - private String line; - private boolean closed = false; - - private Reader(boolean skipPrevFile) throws IOException { - reader = null; - file = skipPrevFile? curr : prev; - readNext(); - } - - @Override - public boolean isPrevious() { - return file == prev; - } - - @Override - public boolean isLastReadFromPrevious() { - return lastReadFile == prev; - } - - private boolean openFile() throws IOException { - - for(int i=0; i<2; i++) { - if (reader != null || i > 0) { - // move to next file - file = isPrevious()? curr : null; - } - if (file == null) { - return false; - } - if (file.exists()) { - break; - } - } - - if (reader != null ) { - reader.close(); - reader = null; - } - - reader = new BufferedReader(new InputStreamReader(new FileInputStream( - file), Charsets.UTF_8)); - return true; - } - - // read next line if possible. - private void readNext() throws IOException { - line = null; - try { - if (reader != null && (line = reader.readLine()) != null) { - return; - } - // move to the next file. - if (openFile()) { - readNext(); - } - } finally { - if (!hasNext()) { - close(); - } - } - } - - @Override - public boolean hasNext() { - return line != null; - } - - @Override - public String next() { - String curLine = line; - try { - lastReadFile = file; - readNext(); - } catch (IOException e) { - DataBlockScanner.LOG.warn("Failed to read next line.", e); - } - return curLine; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws IOException { - if (!closed) { - try { - if (reader != null) { - reader.close(); - } - } finally { - file = null; - reader = null; - closed = true; - final int n = numReaders.decrementAndGet(); - assert(n >= 0); - } - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 4d60792..c24f7be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1005,6 +1005,26 @@ </property> <property> + <name>dfs.datanode.scan.period.hours</name> + <value>0</value> + <description> + If this is 0 or negative, the DataNode's block scanner will be + disabled. If this is positive, the DataNode will not scan any + individual block more than once in the specified scan period. + </description> +</property> + +<property> + <name>dfs.block.scanner.volume.bytes.per.second</name> + <value>1048576</value> + <description> + If this is 0, the DataNode's block scanner will be disabled. If this + is positive, this is the number of bytes per second that the DataNode's + block scanner will try to scan from each volume. + </description> +</property> + +<property> <name>dfs.datanode.readahead.bytes</name> <value>4193404</value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/6e62a1a6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 15f5f2e..0eef46f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1637,4 +1637,20 @@ public class DFSTestUtil { } }, 100, waitTime); } + + /** + * Change the length of a block at datanode dnIndex + */ + public static boolean changeReplicaLength(MiniDFSCluster cluster, + ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException { + File blockFile = cluster.getBlockFile(dnIndex, blk); + if (blockFile != null && blockFile.exists()) { + RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw"); + raFile.setLength(raFile.length()+lenDelta); + raFile.close(); + return true; + } + LOG.info("failed to change length of block " + blk); + return false; + } }
