HDFS-8873. Allow the directoryScanner to be rate-limited (Daniel Templeton via 
Colin P. McCabe)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7a3c381b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7a3c381b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7a3c381b

Branch: refs/heads/HDFS-7240
Commit: 7a3c381b39887a02e944fa98287afd0eb4db3560
Parents: 7fe521b
Author: Colin Patrick Mccabe <cmcc...@cloudera.com>
Authored: Sat Sep 26 04:09:06 2015 -0700
Committer: Colin Patrick Mccabe <cmcc...@cloudera.com>
Committed: Sat Sep 26 04:09:06 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   5 +
 .../hdfs/server/datanode/DirectoryScanner.java  | 337 +++++++++++++++++--
 .../src/main/resources/hdfs-default.xml         |  20 ++
 .../server/datanode/TestDirectoryScanner.java   | 234 ++++++++++++-
 5 files changed, 570 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a3c381b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index e3d9660..b3940b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -974,6 +974,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9133. ExternalBlockReader and ReplicaAccessor need to return -1 on 
read
     when at EOF. (Colin Patrick McCabe via Lei (Eddy) Xu)
 
+    HDFS-8873. Allow the directoryScanner to be rate-limited (Daniel Templeton
+    via Colin P. McCabe)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a3c381b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 780484c..3bad9d2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -403,6 +403,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys 
{
   public static final int     DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT = 
21600;
   public static final String  DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY = 
"dfs.datanode.directoryscan.threads";
   public static final int     DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT = 1;
+  public static final String
+      DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY =
+      "dfs.datanode.directoryscan.throttle.limit.ms.per.sec";
+  public static final int
+      DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT = 1000;
   public static final String  DFS_DATANODE_DNS_INTERFACE_KEY = 
"dfs.datanode.dns.interface";
   public static final String  DFS_DATANODE_DNS_INTERFACE_DEFAULT = "default";
   public static final String  DFS_DATANODE_DNS_NAMESERVER_KEY = 
"dfs.datanode.dns.nameserver";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a3c381b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 3383d0e..b8ea5bf 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
@@ -33,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -47,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StopWatch;
 import org.apache.hadoop.util.Time;
 
 /**
@@ -56,27 +59,59 @@ import org.apache.hadoop.util.Time;
 @InterfaceAudience.Private
 public class DirectoryScanner implements Runnable {
   private static final Log LOG = LogFactory.getLog(DirectoryScanner.class);
+  private static final int MILLIS_PER_SECOND = 1000;
+  private static final String START_MESSAGE =
+      "Periodic Directory Tree Verification scan"
+      + " starting at %dms with interval of %dms";
+  private static final String START_MESSAGE_WITH_THROTTLE = START_MESSAGE
+      + " and throttle limit of %dms/s";
 
   private final FsDatasetSpi<?> dataset;
   private final ExecutorService reportCompileThreadPool;
   private final ScheduledExecutorService masterThread;
   private final long scanPeriodMsecs;
+  private final int throttleLimitMsPerSec;
   private volatile boolean shouldRun = false;
   private boolean retainDiffs = false;
   private final DataNode datanode;
 
+  /**
+   * Total combined wall clock time (in milliseconds) spent by the report
+   * compiler threads executing.  Used for testing purposes.
+   */
+  @VisibleForTesting
+  final AtomicLong timeRunningMs = new AtomicLong(0L);
+  /**
+   * Total combined wall clock time (in milliseconds) spent by the report
+   * compiler threads blocked by the throttle.  Used for testing purposes.
+   */
+  @VisibleForTesting
+  final AtomicLong timeWaitingMs = new AtomicLong(0L);
+  /**
+   * The complete list of block differences indexed by block pool ID.
+   */
+  @VisibleForTesting
   final ScanInfoPerBlockPool diffs = new ScanInfoPerBlockPool();
+  /**
+   * Statistics about the block differences in each blockpool, indexed by
+   * block pool ID.
+   */
+  @VisibleForTesting
   final Map<String, Stats> stats = new HashMap<String, Stats>();
   
   /**
-   * Allow retaining diffs for unit test and analysis
-   * @param b - defaults to false (off)
+   * Allow retaining diffs for unit test and analysis. Defaults to false (off)
+   * @param b whether to retain diffs
    */
+  @VisibleForTesting
   void setRetainDiffs(boolean b) {
     retainDiffs = b;
   }
 
-  /** Stats tracked for reporting and testing, per blockpool */
+  /**
+   * Stats tracked for reporting and testing, per blockpool
+   */
+  @VisibleForTesting
   static class Stats {
     final String bpid;
     long totalBlocks = 0;
@@ -86,6 +121,10 @@ public class DirectoryScanner implements Runnable {
     long mismatchBlocks = 0;
     long duplicateBlocks = 0;
     
+    /**
+     * Create a new Stats object for the given blockpool ID.
+     * @param bpid blockpool ID
+     */
     public Stats(String bpid) {
       this.bpid = bpid;
     }
@@ -99,18 +138,32 @@ public class DirectoryScanner implements Runnable {
       + ", mismatched blocks:" + mismatchBlocks;
     }
   }
-  
+
+  /**
+   * Helper class for compiling block info reports from report compiler 
threads.
+   */
   static class ScanInfoPerBlockPool extends 
                      HashMap<String, LinkedList<ScanInfo>> {
     
     private static final long serialVersionUID = 1L;
 
+    /**
+     * Create a new info list.
+     */
     ScanInfoPerBlockPool() {super();}
-    
+
+    /**
+     * Create a new info list initialized to the given expected size.
+     * See {@link java.util.HashMap#HashMap(int)}.
+     *
+     * @param sz initial expected size
+     */
     ScanInfoPerBlockPool(int sz) {super(sz);}
     
     /**
      * Merges {@code that} ScanInfoPerBlockPool into this one
+     *
+     * @param the ScanInfoPerBlockPool to merge
      */
     public void addAll(ScanInfoPerBlockPool that) {
       if (that == null) return;
@@ -132,6 +185,7 @@ public class DirectoryScanner implements Runnable {
     /**
      * Convert all the LinkedList values in this ScanInfoPerBlockPool map
      * into sorted arrays, and return a new map of these arrays per blockpool
+     *
      * @return a map of ScanInfo arrays per blockpool
      */
     public Map<String, ScanInfo[]> toSortedArrays() {
@@ -208,6 +262,9 @@ public class DirectoryScanner implements Runnable {
      * For example, the condensed version of /foo//bar is /foo/bar
      * Unlike {@link File#getCanonicalPath()}, this will never perform I/O
      * on the filesystem.
+     *
+     * @param path the path to condense
+     * @return the condensed path
      */
     private static String getCondensedPath(String path) {
       return CONDENSED_PATH_REGEX.matcher(path).
@@ -230,6 +287,15 @@ public class DirectoryScanner implements Runnable {
       throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
     }
 
+    /**
+     * Create a ScanInfo object for a block. This constructor will examine
+     * the block data and meta-data files.
+     *
+     * @param blockId the block ID
+     * @param blockFile the path to the block data file
+     * @param metaFile the path to the block meta-data file
+     * @param vol the volume that contains the block
+     */
     ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
       this.blockId = blockId;
       String condensedVolPath = vol == null ? null :
@@ -248,15 +314,31 @@ public class DirectoryScanner implements Runnable {
       this.volume = vol;
     }
 
+    /**
+     * Returns the block data file.
+     *
+     * @return the block data file
+     */
     File getBlockFile() {
       return (blockSuffix == null) ? null :
         new File(volume.getBasePath(), blockSuffix);
     }
 
+    /**
+     * Return the length of the data block. The length returned is the length
+     * cached when this object was created.
+     *
+     * @return the length of the data block
+     */
     long getBlockFileLength() {
       return blockFileLength;
     }
 
+    /**
+     * Returns the block meta data file or null if there isn't one.
+     *
+     * @return the block meta data file
+     */
     File getMetaFile() {
       if (metaSuffix == null) {
         return null;
@@ -267,10 +349,20 @@ public class DirectoryScanner implements Runnable {
       }
     }
 
+    /**
+     * Returns the block ID.
+     *
+     * @return the block ID
+     */
     long getBlockId() {
       return blockId;
     }
 
+    /**
+     * Returns the volume that contains the block that this object describes.
+     *
+     * @return the volume
+     */
     FsVolumeSpi getVolume() {
       return volume;
     }
@@ -309,12 +401,44 @@ public class DirectoryScanner implements Runnable {
     }
   }
 
+  /**
+   * Create a new directory scanner, but don't cycle it running yet.
+   *
+   * @param datanode the parent datanode
+   * @param dataset the dataset to scan
+   * @param conf the Configuration object
+   */
   DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration 
conf) {
     this.datanode = datanode;
     this.dataset = dataset;
     int interval = 
conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
         DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT);
-    scanPeriodMsecs = interval * 1000L; //msec
+    scanPeriodMsecs = interval * MILLIS_PER_SECOND; //msec
+
+    int throttle =
+        conf.getInt(
+          
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
+          
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
+
+    if ((throttle > MILLIS_PER_SECOND) || (throttle <= 0)) {
+      if (throttle > MILLIS_PER_SECOND) {
+        LOG.error(
+            
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
+            + " set to value above 1000 ms/sec. Assuming default value of " +
+            
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
+      } else {
+        LOG.error(
+            
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY
+            + " set to value below 1 ms/sec. Assuming default value of " +
+            
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT);
+      }
+
+      throttleLimitMsPerSec =
+          
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_DEFAULT;
+    } else {
+      throttleLimitMsPerSec = throttle;
+    }
+
     int threads = 
         conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
                     DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_DEFAULT);
@@ -325,30 +449,50 @@ public class DirectoryScanner implements Runnable {
         new Daemon.DaemonFactory());
   }
 
+  /**
+   * Start the scanner.  The scanner will run every
+   * {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY} seconds.
+   */
   void start() {
     shouldRun = true;
     long offset = ThreadLocalRandom.current().nextInt(
-        (int) (scanPeriodMsecs/1000L)) * 1000L; //msec
+        (int) (scanPeriodMsecs/MILLIS_PER_SECOND)) * MILLIS_PER_SECOND; //msec
     long firstScanTime = Time.now() + offset;
-    LOG.info("Periodic Directory Tree Verification scan starting at " 
-        + firstScanTime + " with interval " + scanPeriodMsecs);
+    String logMsg;
+
+    if (throttleLimitMsPerSec < MILLIS_PER_SECOND) {
+      logMsg = String.format(START_MESSAGE_WITH_THROTTLE, firstScanTime,
+          scanPeriodMsecs, throttleLimitMsPerSec);
+    } else {
+      logMsg = String.format(START_MESSAGE, firstScanTime, scanPeriodMsecs);
+    }
+
+    LOG.info(logMsg);
     masterThread.scheduleAtFixedRate(this, offset, scanPeriodMsecs, 
                                      TimeUnit.MILLISECONDS);
   }
   
-  // for unit test
+  /**
+   * Return whether the scanner has been started.
+   *
+   * @return whether the scanner has been started
+   */
+  @VisibleForTesting
   boolean getRunStatus() {
     return shouldRun;
   }
 
+  /**
+   * Clear the current cache of diffs and statistics.
+   */
   private void clear() {
     diffs.clear();
     stats.clear();
   }
 
   /**
-   * Main program loop for DirectoryScanner
-   * Runs "reconcile()" periodically under the masterThread.
+   * Main program loop for DirectoryScanner.  Runs {@link reconcile()}
+   * and handles any exceptions.
    */
   @Override
   public void run() {
@@ -372,6 +516,12 @@ public class DirectoryScanner implements Runnable {
     }
   }
   
+  /**
+   * Stops the directory scanner.  This method will wait for 1 minute for the
+   * main thread to exit and an additional 1 minute for the report compilation
+   * threads to exit.  If a thread does not exit in that time period, it is
+   * left running, and an error is logged.
+   */
   void shutdown() {
     if (!shouldRun) {
       LOG.warn("DirectoryScanner: shutdown has been called, but periodic 
scanner not started");
@@ -380,7 +530,11 @@ public class DirectoryScanner implements Runnable {
     }
     shouldRun = false;
     if (masterThread != null) masterThread.shutdown();
-    if (reportCompileThreadPool != null) reportCompileThreadPool.shutdown();
+
+    if (reportCompileThreadPool != null) {
+      reportCompileThreadPool.shutdownNow();
+    }
+
     if (masterThread != null) {
       try {
         masterThread.awaitTermination(1, TimeUnit.MINUTES);
@@ -403,6 +557,7 @@ public class DirectoryScanner implements Runnable {
   /**
    * Reconcile differences between disk and in-memory blocks
    */
+  @VisibleForTesting
   void reconcile() throws IOException {
     scan();
     for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
@@ -421,7 +576,7 @@ public class DirectoryScanner implements Runnable {
    * Scan for the differences between disk and in-memory blocks
    * Scan only the "finalized blocks" lists of both disk and memory.
    */
-  void scan() {
+  private void scan() {
     clear();
     Map<String, ScanInfo[]> diskReport = getDiskReport();
 
@@ -509,8 +664,13 @@ public class DirectoryScanner implements Runnable {
   }
 
   /**
-   * Block is found on the disk. In-memory block is missing or does not match
-   * the block on the disk
+   * Add the ScanInfo object to the list of differences and adjust the stats
+   * accordingly.  This method is called when a block is found on the disk,
+   * but the in-memory block is missing or does not match the block on the 
disk.
+   *
+   * @param diffRecord the list to which to add the info
+   * @param statsRecord the stats to update
+   * @param info the differing info
    */
   private void addDifference(LinkedList<ScanInfo> diffRecord, 
                              Stats statsRecord, ScanInfo info) {
@@ -519,7 +679,15 @@ public class DirectoryScanner implements Runnable {
     diffRecord.add(info);
   }
 
-  /** Block is not found on the disk */
+  /**
+   * Add a new ScanInfo object to the list of differences and adjust the stats
+   * accordingly.  This method is called when a block is not found on the disk.
+   *
+   * @param diffRecord the list to which to add the info
+   * @param statsRecord the stats to update
+   * @param blockId the id of the missing block
+   * @param vol the volume that contains the missing block
+   */
   private void addDifference(LinkedList<ScanInfo> diffRecord,
                              Stats statsRecord, long blockId,
                              FsVolumeSpi vol) {
@@ -528,7 +696,13 @@ public class DirectoryScanner implements Runnable {
     diffRecord.add(new ScanInfo(blockId, null, null, vol));
   }
 
-  /** Get lists of blocks on the disk sorted by blockId, per blockpool */
+  /**
+   * Get the lists of blocks on the disks in the dataset, sorted by blockId.
+   * The returned map contains one entry per blockpool, keyed by the blockpool
+   * ID.
+   *
+   * @return a map of sorted arrays of block information
+   */
   private Map<String, ScanInfo[]> getDiskReport() {
     ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
     ScanInfoPerBlockPool[] dirReports = null;
@@ -555,6 +729,12 @@ public class DirectoryScanner implements Runnable {
           compilersInProgress.entrySet()) {
         try {
           dirReports[report.getKey()] = report.getValue().get();
+
+          // If our compiler threads were interrupted, give up on this run
+          if (dirReports[report.getKey()] == null) {
+            dirReports = null;
+            break;
+          }
         } catch (Exception ex) {
           LOG.error("Error compiling report", ex);
           // Propagate ex to DataBlockScanner to deal with
@@ -573,38 +753,102 @@ public class DirectoryScanner implements Runnable {
     return list.toSortedArrays();
   }
 
+  /**
+   * Helper method to determine if a file name is consistent with a block.
+   * meta-data file
+   *
+   * @param blockId the block ID
+   * @param metaFile the file to check
+   * @return whether the file name is a block meta-data file name
+   */
   private static boolean isBlockMetaFile(String blockId, String metaFile) {
     return metaFile.startsWith(blockId)
         && metaFile.endsWith(Block.METADATA_EXTENSION);
   }
 
-  private static class ReportCompiler 
-  implements Callable<ScanInfoPerBlockPool> {
+  /**
+   * The ReportCompiler class encapsulates the process of searching a 
datanode's
+   * disks for block information.  It operates by performing a DFS of the
+   * volume to discover block information.
+   *
+   * When the ReportCompiler discovers block information, it create a new
+   * ScanInfo object for it and adds that object to its report list.  The 
report
+   * list is returned by the {@link #call()} method.
+   */
+  private class ReportCompiler implements Callable<ScanInfoPerBlockPool> {
     private final FsVolumeSpi volume;
     private final DataNode datanode;
+    // Variable for tracking time spent running for throttling purposes
+    private final StopWatch throttleTimer = new StopWatch();
+    // Variable for tracking time spent running and waiting for testing
+    // purposes
+    private final StopWatch perfTimer = new StopWatch();
+
+    /**
+     * The associated thread.  Used for testing purposes only.
+     */
+    @VisibleForTesting
+    Thread currentThread;
 
+    /**
+     * Create a report compiler for the given volume on the given datanode.
+     *
+     * @param datanode the target datanode
+     * @param volume the target volume
+     */
     public ReportCompiler(DataNode datanode, FsVolumeSpi volume) {
       this.datanode = datanode;
       this.volume = volume;
     }
 
+    /**
+     * Run this report compiler thread.
+     *
+     * @return the block info report list
+     * @throws IOException if the block pool isn't found
+     */
     @Override
-    public ScanInfoPerBlockPool call() throws Exception {
+    public ScanInfoPerBlockPool call() throws IOException {
+      currentThread = Thread.currentThread();
+
       String[] bpList = volume.getBlockPoolList();
       ScanInfoPerBlockPool result = new ScanInfoPerBlockPool(bpList.length);
       for (String bpid : bpList) {
-        LinkedList<ScanInfo> report = new LinkedList<ScanInfo>();
+        LinkedList<ScanInfo> report = new LinkedList<>();
         File bpFinalizedDir = volume.getFinalizedDir(bpid);
-        result.put(bpid,
-            compileReport(volume, bpFinalizedDir, bpFinalizedDir, report));
+
+        perfTimer.start();
+        throttleTimer.start();
+
+        try {
+          result.put(bpid,
+              compileReport(volume, bpFinalizedDir, bpFinalizedDir, report));
+        } catch (InterruptedException ex) {
+          // Exit quickly and flag the scanner to do the same
+          result = null;
+          break;
+        }
       }
       return result;
     }
 
-    /** Compile list {@link ScanInfo} for the blocks in the directory <dir> */
+    /**
+     * Compile a list of {@link ScanInfo} for the blocks in the directory
+     * given by {@code dir}.
+     *
+     * @param vol the volume that contains the directory to scan
+     * @param bpFinalizedDir the root directory of the directory to scan
+     * @param dir the directory to scan
+     * @param report the list onto which blocks reports are placed
+     */
     private LinkedList<ScanInfo> compileReport(FsVolumeSpi vol,
-        File bpFinalizedDir, File dir, LinkedList<ScanInfo> report) {
+        File bpFinalizedDir, File dir, LinkedList<ScanInfo> report)
+        throws InterruptedException {
+
       File[] files;
+
+      throttle();
+
       try {
         files = FileUtil.listFiles(dir);
       } catch (IOException ioe) {
@@ -622,6 +866,12 @@ public class DirectoryScanner implements Runnable {
        * blk_<blockid>_<genstamp>.meta
        */
       for (int i = 0; i < files.length; i++) {
+        // Make sure this thread can make a timely exit. With a low throttle
+        // rate, completing a run can take a looooong time.
+        if (Thread.interrupted()) {
+          throw new InterruptedException();
+        }
+
         if (files[i].isDirectory()) {
           compileReport(vol, bpFinalizedDir, files[i], report);
           continue;
@@ -668,5 +918,40 @@ public class DirectoryScanner implements Runnable {
             + " has to be upgraded to block ID-based layout");
       }
     }
+
+    /**
+     * Called by the thread before each potential disk scan so that a pause
+     * can be optionally inserted to limit the number of scans per second.
+     * The limit is controlled by
+     * {@link 
DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY}.
+     */
+    private void throttle() throws InterruptedException {
+      accumulateTimeRunning();
+
+      if ((throttleLimitMsPerSec < 1000) &&
+          (throttleTimer.now(TimeUnit.MILLISECONDS) > throttleLimitMsPerSec)) {
+
+        Thread.sleep(MILLIS_PER_SECOND - throttleLimitMsPerSec);
+        throttleTimer.reset().start();
+      }
+
+      accumulateTimeWaiting();
+    }
+
+    /**
+     * Helper method to measure time running.
+     */
+    private void accumulateTimeRunning() {
+      timeRunningMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
+      perfTimer.reset().start();
+    }
+
+    /**
+     * Helper method to measure time waiting.
+     */
+    private void accumulateTimeWaiting() {
+      timeWaitingMs.getAndAdd(perfTimer.now(TimeUnit.MILLISECONDS));
+      perfTimer.reset().start();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a3c381b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 77460ef..0c1ad7d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -636,6 +636,26 @@
 </property>
 
 <property>
+  <name>dfs.datanode.directoryscan.throttle.limit.ms.per.sec</name>
+  <value>0</value>
+  <description>The report compilation threads are limited to only running for
+  a given number of milliseconds per second, as configured by the
+  property. The limit is taken per thread, not in aggregate, e.g. setting
+  a limit of 100ms for 4 compiler threads will result in each thread being
+  limited to 100ms, not 25ms.
+
+  Note that the throttle does not interrupt the report compiler threads, so the
+  actual running time of the threads per second will typically be somewhat
+  higher than the throttle limit, usually by no more than 20%.
+
+  Setting this limit to 1000 disables compiler thread throttling. Only
+  values between 1 and 1000 are valid. Setting an invalid value will result
+  in the throttle being disbled and an error message being logged. 1000 is
+  the default setting.
+  </description>
+</property>
+
+<property>
   <name>dfs.heartbeat.interval</name>
   <value>3</value>
   <description>Determines datanode heartbeat interval in seconds.</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7a3c381b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index baf50d8..72c4497 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -33,6 +33,10 @@ import java.nio.channels.FileChannel;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
@@ -56,6 +60,8 @@ import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
@@ -84,6 +90,11 @@ public class TestDirectoryScanner {
                  Long.MAX_VALUE);
   }
 
+  @Before
+  public void setup() {
+    LazyPersistTestCase.initCacheManipulator();
+  }
+
   /** create a file with a length of <code>fileLen</code> */
   private List<LocatedBlock> createFile(String fileNamePrefix,
                                         long fileLen,
@@ -311,7 +322,6 @@ public class TestDirectoryScanner {
 
   @Test (timeout=300000)
   public void testRetainBlockOnPersistentStorage() throws Exception {
-    LazyPersistTestCase.initCacheManipulator();
     cluster = new MiniDFSCluster
         .Builder(CONF)
         .storageTypes(new StorageType[] { StorageType.RAM_DISK, 
StorageType.DEFAULT })
@@ -353,7 +363,6 @@ public class TestDirectoryScanner {
 
   @Test (timeout=300000)
   public void testDeleteBlockOnTransientStorage() throws Exception {
-    LazyPersistTestCase.initCacheManipulator();
     cluster = new MiniDFSCluster
         .Builder(CONF)
         .storageTypes(new StorageType[] { StorageType.RAM_DISK, 
StorageType.DEFAULT })
@@ -515,7 +524,13 @@ public class TestDirectoryScanner {
       scan(totalBlocks+3, 6, 2, 2, 3, 2);
       scan(totalBlocks+1, 0, 0, 0, 0, 0);
       
-      // Test14: validate clean shutdown of DirectoryScanner
+      // Test14: make sure no throttling is happening
+      assertTrue("Throttle appears to be engaged",
+          scanner.timeWaitingMs.get() < 10L);
+      assertTrue("Report complier threads logged no execution time",
+          scanner.timeRunningMs.get() > 0L);
+
+      // Test15: validate clean shutdown of DirectoryScanner
       ////assertTrue(scanner.getRunStatus()); //assumes "real" FSDataset, not 
sim
       scanner.shutdown();
       assertFalse(scanner.getRunStatus());
@@ -529,6 +544,219 @@ public class TestDirectoryScanner {
     }
   }
 
+  /**
+   * Test that the timeslice throttle limits the report compiler thread's
+   * execution time correctly.  We test by scanning a large block pool and
+   * comparing the time spent waiting to the time spent running.
+   *
+   * The block pool has to be large, or the ratio will be off.  The throttle
+   * allows the report compiler thread to finish its current cycle when
+   * blocking it, so the ratio will always be a little lower than expected.
+   * The smaller the block pool, the further off the ratio will be.
+   *
+   * @throws Exception thrown on unexpected failure
+   */
+  @Test (timeout=300000)
+  public void testThrottling() throws Exception {
+    Configuration conf = new Configuration(CONF);
+
+    // We need lots of blocks so the report compiler threads have enough to
+    // keep them busy while we watch them.
+    int blocks = 20000;
+    int maxRetries = 3;
+
+    cluster = new MiniDFSCluster.Builder(conf).build();
+
+    try {
+      cluster.waitActive();
+      bpid = cluster.getNamesystem().getBlockPoolId();
+      fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
+      client = cluster.getFileSystem().getClient();
+      conf.setInt(
+          
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
+          100);
+      DataNode dataNode = cluster.getDataNodes().get(0);
+
+      createFile(GenericTestUtils.getMethodName(),
+          BLOCK_LENGTH * blocks, false);
+
+      float ratio = 0.0f;
+      int retries = maxRetries;
+
+      while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
+        scanner = new DirectoryScanner(dataNode, fds, conf);
+        ratio = runThrottleTest(blocks);
+        retries -= 1;
+      }
+
+      // Waiting should be about 9x running.
+      LOG.info("RATIO: " + ratio);
+      assertTrue("Throttle is too restrictive", ratio <= 10f);
+      assertTrue("Throttle is too permissive", ratio >= 7f);
+
+      // Test with a different limit
+      conf.setInt(
+          
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
+          200);
+      ratio = 0.0f;
+      retries = maxRetries;
+
+      while ((retries > 0) && ((ratio < 3f) || (ratio > 4.5f))) {
+        scanner = new DirectoryScanner(dataNode, fds, conf);
+        ratio = runThrottleTest(blocks);
+        retries -= 1;
+      }
+
+      // Waiting should be about 4x running.
+      LOG.info("RATIO: " + ratio);
+      assertTrue("Throttle is too restrictive", ratio <= 4.5f);
+      assertTrue("Throttle is too permissive", ratio >= 3.0f);
+
+      // Test with more than 1 thread
+      conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 3);
+      conf.setInt(
+          
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
+          100);
+      ratio = 0.0f;
+      retries = maxRetries;
+
+      while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) {
+        scanner = new DirectoryScanner(dataNode, fds, conf);
+        ratio = runThrottleTest(blocks);
+        retries -= 1;
+      }
+
+      // Waiting should be about 9x running.
+      LOG.info("RATIO: " + ratio);
+      assertTrue("Throttle is too restrictive", ratio <= 10f);
+      assertTrue("Throttle is too permissive", ratio >= 7f);
+
+      // Test with no limit
+      scanner = new DirectoryScanner(dataNode, fds, CONF);
+      scanner.setRetainDiffs(true);
+      scan(blocks, 0, 0, 0, 0, 0);
+      scanner.shutdown();
+      assertFalse(scanner.getRunStatus());
+
+      assertTrue("Throttle appears to be engaged",
+          scanner.timeWaitingMs.get() < 10L);
+      assertTrue("Report complier threads logged no execution time",
+          scanner.timeRunningMs.get() > 0L);
+
+      // Test with a 1ms limit.  This also tests whether the scanner can be
+      // shutdown cleanly in mid stride.
+      conf.setInt(
+          
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
+          1);
+      ratio = 0.0f;
+      retries = maxRetries;
+      ScheduledExecutorService interruptor =
+          Executors.newScheduledThreadPool(maxRetries);
+
+      try {
+        while ((retries > 0) && (ratio < 10)) {
+          scanner = new DirectoryScanner(dataNode, fds, conf);
+          scanner.setRetainDiffs(true);
+
+          final AtomicLong nowMs = new AtomicLong();
+
+          // Stop the scanner after 2 seconds because otherwise it will take an
+          // eternity to complete it's run
+          interruptor.schedule(new Runnable() {
+            @Override
+            public void run() {
+              scanner.shutdown();
+              nowMs.set(Time.monotonicNow());
+            }
+          }, 2L, TimeUnit.SECONDS);
+
+          scanner.reconcile();
+          assertFalse(scanner.getRunStatus());
+          LOG.info("Scanner took " + (Time.monotonicNow() - nowMs.get())
+              + "ms to shutdown");
+          assertTrue("Scanner took too long to shutdown",
+              Time.monotonicNow() - nowMs.get() < 1000L);
+
+          ratio =
+              (float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
+          retries -= 1;
+        }
+      } finally {
+        interruptor.shutdown();
+      }
+
+      // We just want to test that it waits a lot, but it also runs some
+      LOG.info("RATIO: " + ratio);
+      assertTrue("Throttle is too permissive",
+          ratio > 10);
+      assertTrue("Report complier threads logged no execution time",
+          scanner.timeRunningMs.get() > 0L);
+
+      // Test with a 0 limit, i.e. disabled
+      conf.setInt(
+          
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
+          0);
+      scanner = new DirectoryScanner(dataNode, fds, conf);
+      scanner.setRetainDiffs(true);
+      scan(blocks, 0, 0, 0, 0, 0);
+      scanner.shutdown();
+      assertFalse(scanner.getRunStatus());
+
+      assertTrue("Throttle appears to be engaged",
+          scanner.timeWaitingMs.get() < 10L);
+      assertTrue("Report complier threads logged no execution time",
+          scanner.timeRunningMs.get() > 0L);
+
+      // Test with a 1000 limit, i.e. disabled
+      conf.setInt(
+          
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
+          1000);
+      scanner = new DirectoryScanner(dataNode, fds, conf);
+      scanner.setRetainDiffs(true);
+      scan(blocks, 0, 0, 0, 0, 0);
+      scanner.shutdown();
+      assertFalse(scanner.getRunStatus());
+
+      assertTrue("Throttle appears to be engaged",
+          scanner.timeWaitingMs.get() < 10L);
+      assertTrue("Report complier threads logged no execution time",
+          scanner.timeRunningMs.get() > 0L);
+
+      // Test that throttle works from regular start
+      conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1);
+      conf.setInt(
+          
DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY,
+          10);
+      conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY,
+        1);
+      scanner = new DirectoryScanner(dataNode, fds, conf);
+      scanner.setRetainDiffs(true);
+      scanner.start();
+
+      int count = 50;
+
+      while ((count > 0) && (scanner.timeWaitingMs.get() < 500L)) {
+        Thread.sleep(100L);
+        count -= 1;
+      }
+
+      scanner.shutdown();
+      assertFalse(scanner.getRunStatus());
+      assertTrue("Throttle does not appear to be engaged", count > 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private float runThrottleTest(int blocks) throws IOException {
+    scanner.setRetainDiffs(true);
+    scan(blocks, 0, 0, 0, 0, 0);
+    scanner.shutdown();
+    assertFalse(scanner.getRunStatus());
+
+    return (float)scanner.timeWaitingMs.get() / scanner.timeRunningMs.get();
+  }
+
   private void verifyAddition(long blockId, long genStamp, long size) {
     final ReplicaInfo replicainfo;
     replicainfo = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);

Reply via email to