HDFS-12291: [SPS]: Provide a mechanism to recursively iterate and satisfy 
storage policy of all the files under the given dir. Contributed by Surendra 
Singh Lilhore.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/50d23317
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/50d23317
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/50d23317

Branch: refs/heads/HDFS-10285
Commit: 50d23317a4fcdd52615756033fdba792170f9eeb
Parents: 60e9c15
Author: Uma Maheswara Rao G <uma.ganguma...@intel.com>
Authored: Sat Sep 30 06:31:52 2017 -0700
Committer: Rakesh Radhakrishnan <rake...@apache.org>
Committed: Thu Jul 12 17:02:23 2018 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   8 +
 .../java/org/apache/hadoop/hdfs/DFSUtil.java    |  22 +-
 .../BlockStorageMovementAttemptedItems.java     |   8 +-
 .../namenode/BlockStorageMovementNeeded.java    | 277 +++++++++++---
 .../server/namenode/ReencryptionHandler.java    |   1 +
 .../server/namenode/StoragePolicySatisfier.java |  43 ++-
 .../src/main/resources/hdfs-default.xml         |  23 ++
 .../src/site/markdown/ArchivalStorage.md        |   3 +-
 .../TestBlockStorageMovementAttemptedItems.java |   2 +-
 .../TestPersistentStoragePolicySatisfier.java   |   8 +-
 .../namenode/TestStoragePolicySatisfier.java    | 377 ++++++++++++++++++-
 11 files changed, 689 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/50d23317/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index cf5206a..c8fa40c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -612,6 +612,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys 
{
       "dfs.storage.policy.satisfier.enabled";
   public static final boolean DFS_STORAGE_POLICY_SATISFIER_ENABLED_DEFAULT =
       false;
+  public static final String  DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY =
+      "dfs.storage.policy.satisfier.queue.limit";
+  public static final int  DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT =
+      1000;
+  public static final String DFS_SPS_WORK_MULTIPLIER_PER_ITERATION =
+      "dfs.storage.policy.satisfier.work.multiplier.per.iteration";
+  public static final int DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT =
+      1;
   public static final String 
DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY =
       "dfs.storage.policy.satisfier.recheck.timeout.millis";
   public static final int 
DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_DEFAULT =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50d23317/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index f5ceeaf..c26599c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -1457,7 +1457,27 @@ public class DFSUtil {
         "It should be a positive, non-zero integer value.");
     return blocksReplWorkMultiplier;
   }
-  
+
+  /**
+   * Get DFS_SPS_WORK_MULTIPLIER_PER_ITERATION from
+   * configuration.
+   *
+   * @param conf Configuration
+   * @return Value of DFS_SPS_WORK_MULTIPLIER_PER_ITERATION
+   */
+  public static int getSPSWorkMultiplier(Configuration conf) {
+    int spsWorkMultiplier = conf
+        .getInt(
+            DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION,
+            DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION_DEFAULT);
+    Preconditions.checkArgument(
+        (spsWorkMultiplier > 0),
+        DFSConfigKeys.DFS_SPS_WORK_MULTIPLIER_PER_ITERATION +
+        " = '" + spsWorkMultiplier + "' is invalid. " +
+        "It should be a positive, non-zero integer value.");
+    return spsWorkMultiplier;
+  }
+
   /**
    * Get SPNEGO keytab Key from configuration
    * 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50d23317/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 278b62b..549819f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -101,7 +101,7 @@ public class BlockStorageMovementAttemptedItems {
   public void add(ItemInfo itemInfo, boolean allBlockLocsAttemptedToSatisfy) {
     synchronized (storageMovementAttemptedItems) {
       AttemptedItemInfo attemptedItemInfo = new AttemptedItemInfo(
-          itemInfo.getRootId(), itemInfo.getTrackId(), monotonicNow(),
+          itemInfo.getStartId(), itemInfo.getTrackId(), monotonicNow(),
           allBlockLocsAttemptedToSatisfy);
       storageMovementAttemptedItems.put(itemInfo.getTrackId(),
           attemptedItemInfo);
@@ -260,7 +260,7 @@ public class BlockStorageMovementAttemptedItems {
           synchronized (storageMovementAttemptedResults) {
             if (!isExistInResult(blockCollectionID)) {
               ItemInfo candidate = new ItemInfo(
-                  itemInfo.getRootId(), blockCollectionID);
+                  itemInfo.getStartId(), blockCollectionID);
               blockStorageMovementNeeded.add(candidate);
               iter.remove();
               LOG.info("TrackID: {} becomes timed out and moved to needed "
@@ -315,7 +315,7 @@ public class BlockStorageMovementAttemptedItems {
           // blockStorageMovementNeeded#removeIteamTrackInfo() for cleaning
           // the xAttr
           ItemInfo itemInfo = new ItemInfo((attemptedItemInfo != null)
-              ? attemptedItemInfo.getRootId() : trackId, trackId);
+              ? attemptedItemInfo.getStartId() : trackId, trackId);
           switch (status) {
           case FAILURE:
             if (attemptedItemInfo != null) {
@@ -345,7 +345,7 @@ public class BlockStorageMovementAttemptedItems {
             if (attemptedItemInfo != null) {
               if (!attemptedItemInfo.isAllBlockLocsAttemptedToSatisfy()) {
                 blockStorageMovementNeeded
-                    .add(new ItemInfo(attemptedItemInfo.getRootId(), trackId));
+                    .add(new ItemInfo(attemptedItemInfo.getStartId(), 
trackId));
                 LOG.warn("{} But adding trackID back to retry queue as some of"
                     + " the blocks couldn't find matching target nodes in"
                     + " previous SPS iteration.", msg);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50d23317/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
index 41a3a6c..788a98b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
@@ -29,12 +29,15 @@ import java.util.Map;
 import java.util.Queue;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSTreeTraverser.TraverseInfo;
 import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier.ItemInfo;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.util.Daemon;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * A Class to track the block collection IDs (Inode's ID) for which physical
  * storage movement needed as per the Namespace and StorageReports from DN.
@@ -53,11 +56,11 @@ public class BlockStorageMovementNeeded {
       new LinkedList<ItemInfo>();
 
   /**
-   * Map of rootId and number of child's. Number of child's indicate the number
-   * of files pending to satisfy the policy.
+   * Map of startId and number of child's. Number of child's indicate the
+   * number of files pending to satisfy the policy.
    */
-  private final Map<Long, Integer> pendingWorkForDirectory =
-      new HashMap<Long, Integer>();
+  private final Map<Long, DirPendingWorkInfo> pendingWorkForDirectory =
+      new HashMap<Long, DirPendingWorkInfo>();
 
   private final Namesystem namesystem;
 
@@ -66,12 +69,15 @@ public class BlockStorageMovementNeeded {
 
   private final StoragePolicySatisfier sps;
 
-  private Daemon fileInodeIdCollector;
+  private Daemon inodeIdCollector;
+
+  private final int maxQueuedItem;
 
   public BlockStorageMovementNeeded(Namesystem namesystem,
-      StoragePolicySatisfier sps) {
+      StoragePolicySatisfier sps, int queueLimit) {
     this.namesystem = namesystem;
     this.sps = sps;
+    this.maxQueuedItem = queueLimit;
   }
 
   /**
@@ -88,15 +94,24 @@ public class BlockStorageMovementNeeded {
   /**
    * Add the itemInfo to tracking list for which storage movement
    * expected if necessary.
-   * @param rootId
-   *            - root inode id
+   * @param startId
+   *            - start id
    * @param itemInfoList
    *            - List of child in the directory
    */
-  private synchronized void addAll(Long rootId,
-      List<ItemInfo> itemInfoList) {
+  @VisibleForTesting
+  public synchronized void addAll(long startId,
+      List<ItemInfo> itemInfoList, boolean scanCompleted) {
     storageMovementNeeded.addAll(itemInfoList);
-    pendingWorkForDirectory.put(rootId, itemInfoList.size());
+    DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
+    if (pendingWork == null) {
+      pendingWork = new DirPendingWorkInfo();
+      pendingWorkForDirectory.put(startId, pendingWork);
+    }
+    pendingWork.addPendingWorkCount(itemInfoList.size());
+    if (scanCompleted) {
+      pendingWork.markScanCompleted();
+    }
   }
 
   /**
@@ -118,6 +133,25 @@ public class BlockStorageMovementNeeded {
     }
   }
 
+  /**
+   * Returns queue remaining capacity.
+   */
+  public synchronized int remainingCapacity() {
+    int size = storageMovementNeeded.size();
+    if (size >= maxQueuedItem) {
+      return 0;
+    } else {
+      return (maxQueuedItem - size);
+    }
+  }
+
+  /**
+   * Returns queue size.
+   */
+  public synchronized int size() {
+    return storageMovementNeeded.size();
+  }
+
   public synchronized void clearAll() {
     spsDirsToBeTraveresed.clear();
     storageMovementNeeded.clear();
@@ -131,20 +165,20 @@ public class BlockStorageMovementNeeded {
   public synchronized void removeItemTrackInfo(ItemInfo trackInfo)
       throws IOException {
     if (trackInfo.isDir()) {
-      // If track is part of some root then reduce the pending directory work
-      // count.
-      long rootId = trackInfo.getRootId();
-      INode inode = namesystem.getFSDirectory().getInode(rootId);
+      // If track is part of some start inode then reduce the pending
+      // directory work count.
+      long startId = trackInfo.getStartId();
+      INode inode = namesystem.getFSDirectory().getInode(startId);
       if (inode == null) {
         // directory deleted just remove it.
-        this.pendingWorkForDirectory.remove(rootId);
+        this.pendingWorkForDirectory.remove(startId);
       } else {
-        if (pendingWorkForDirectory.get(rootId) != null) {
-          Integer pendingWork = pendingWorkForDirectory.get(rootId) - 1;
-          pendingWorkForDirectory.put(rootId, pendingWork);
-          if (pendingWork <= 0) {
-            namesystem.removeXattr(rootId, XATTR_SATISFY_STORAGE_POLICY);
-            pendingWorkForDirectory.remove(rootId);
+        DirPendingWorkInfo pendingWork = pendingWorkForDirectory.get(startId);
+        if (pendingWork != null) {
+          pendingWork.decrementPendingWorkCount();
+          if (pendingWork.isDirWorkDone()) {
+            namesystem.removeXattr(startId, XATTR_SATISFY_STORAGE_POLICY);
+            pendingWorkForDirectory.remove(startId);
           }
         }
       }
@@ -161,7 +195,7 @@ public class BlockStorageMovementNeeded {
     Iterator<ItemInfo> iterator = storageMovementNeeded.iterator();
     while (iterator.hasNext()) {
       ItemInfo next = iterator.next();
-      if (next.getRootId() == trackId) {
+      if (next.getStartId() == trackId) {
         iterator.remove();
       }
     }
@@ -208,7 +242,17 @@ public class BlockStorageMovementNeeded {
    * Take dir tack ID from the spsDirsToBeTraveresed queue and collect child
    * ID's to process for satisfy the policy.
    */
-  private class FileInodeIdCollector implements Runnable {
+  private class StorageMovementPendingInodeIdCollector extends FSTreeTraverser
+      implements Runnable {
+
+    private int remainingCapacity = 0;
+
+    private List<ItemInfo> currentBatch = new ArrayList<>(maxQueuedItem);
+
+    StorageMovementPendingInodeIdCollector(FSDirectory dir) {
+      super(dir);
+    }
+
     @Override
     public void run() {
       LOG.info("Starting FileInodeIdCollector!.");
@@ -216,38 +260,36 @@ public class BlockStorageMovementNeeded {
         try {
           if (!namesystem.isInSafeMode()) {
             FSDirectory fsd = namesystem.getFSDirectory();
-            Long rootINodeId = spsDirsToBeTraveresed.poll();
-            if (rootINodeId == null) {
+            Long startINodeId = spsDirsToBeTraveresed.poll();
+            if (startINodeId == null) {
               // Waiting for SPS path
               synchronized (spsDirsToBeTraveresed) {
                 spsDirsToBeTraveresed.wait(5000);
               }
             } else {
-              INode rootInode = fsd.getInode(rootINodeId);
-              if (rootInode != null) {
-                // TODO : HDFS-12291
-                // 1. Implement an efficient recursive directory iteration
-                // mechanism and satisfies storage policy for all the files
-                // under the given directory.
-                // 2. Process files in batches,so datanodes workload can be
-                // handled.
-                List<ItemInfo> itemInfoList =
-                    new ArrayList<>();
-                for (INode childInode : rootInode.asDirectory()
-                    .getChildrenList(Snapshot.CURRENT_STATE_ID)) {
-                  if (childInode.isFile()
-                      && childInode.asFile().numBlocks() != 0) {
-                    itemInfoList.add(
-                        new ItemInfo(rootINodeId, childInode.getId()));
-                  }
+              INode startInode = fsd.getInode(startINodeId);
+              if (startInode != null) {
+                try {
+                  remainingCapacity = remainingCapacity();
+                  readLock();
+                  traverseDir(startInode.asDirectory(), startINodeId,
+                      HdfsFileStatus.EMPTY_NAME,
+                      new SPSTraverseInfo(startINodeId));
+                } finally {
+                  readUnlock();
                 }
-                if (itemInfoList.isEmpty()) {
-                  // satisfy track info is empty, so remove the xAttr from the
-                  // directory
-                  namesystem.removeXattr(rootINodeId,
+                // Mark startInode traverse is done
+                addAll(startInode.getId(), currentBatch, true);
+                currentBatch.clear();
+
+                // check if directory was empty and no child added to queue
+                DirPendingWorkInfo dirPendingWorkInfo =
+                    pendingWorkForDirectory.get(startInode.getId());
+                if (dirPendingWorkInfo.isDirWorkDone()) {
+                  namesystem.removeXattr(startInode.getId(),
                       XATTR_SATISFY_STORAGE_POLICY);
+                  pendingWorkForDirectory.remove(startInode.getId());
                 }
-                addAll(rootINodeId, itemInfoList);
               }
             }
           }
@@ -256,17 +298,140 @@ public class BlockStorageMovementNeeded {
         }
       }
     }
+
+    @Override
+    protected void checkPauseForTesting() throws InterruptedException {
+      // TODO implement if needed
+    }
+
+    @Override
+    protected boolean processFileInode(INode inode, TraverseInfo traverseInfo)
+        throws IOException, InterruptedException {
+      assert getFSDirectory().hasReadLock();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Processing {} for statisy the policy",
+            inode.getFullPathName());
+      }
+      if (!inode.isFile()) {
+        return false;
+      }
+      if (inode.isFile() && inode.asFile().numBlocks() != 0) {
+        currentBatch.add(new ItemInfo(
+            ((SPSTraverseInfo) traverseInfo).getStartId(), inode.getId()));
+        remainingCapacity--;
+      }
+      return true;
+    }
+
+    @Override
+    protected boolean canSubmitCurrentBatch() {
+      return remainingCapacity <= 0;
+    }
+
+    @Override
+    protected void checkINodeReady(long startId) throws IOException {
+      FSNamesystem fsn = ((FSNamesystem) namesystem);
+      fsn.checkNameNodeSafeMode("NN is in safe mode,"
+          + "cannot satisfy the policy.");
+      // SPS work should be cancelled when NN goes to standby. Just
+      // double checking for sanity.
+      fsn.checkOperation(NameNode.OperationCategory.WRITE);
+    }
+
+    @Override
+    protected void submitCurrentBatch(long startId)
+        throws IOException, InterruptedException {
+      // Add current child's to queue
+      addAll(startId, currentBatch, false);
+      currentBatch.clear();
+    }
+
+    @Override
+    protected void throttle() throws InterruptedException {
+      assert !getFSDirectory().hasReadLock();
+      assert !namesystem.hasReadLock();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("StorageMovementNeeded queue remaining capacity is zero,"
+            + " waiting for some free slots.");
+      }
+      remainingCapacity = remainingCapacity();
+      // wait for queue to be free
+      while (remainingCapacity <= 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Waiting for storageMovementNeeded queue to be free!");
+        }
+        Thread.sleep(5000);
+        remainingCapacity = remainingCapacity();
+      }
+    }
+
+    @Override
+    protected boolean canTraverseDir(INode inode) throws IOException {
+      return true;
+    }
   }
 
-  public void start() {
-    fileInodeIdCollector = new Daemon(new FileInodeIdCollector());
-    fileInodeIdCollector.setName("FileInodeIdCollector");
-    fileInodeIdCollector.start();
+  /**
+   * Info for directory recursive scan.
+   */
+  public static class DirPendingWorkInfo {
+
+    private int pendingWorkCount = 0;
+    private boolean fullyScanned = false;
+
+    /**
+     * Increment the pending work count for directory.
+     */
+    public synchronized void addPendingWorkCount(int count) {
+      this.pendingWorkCount = this.pendingWorkCount + count;
+    }
+
+    /**
+     * Decrement the pending work count for directory one track info is
+     * completed.
+     */
+    public synchronized void decrementPendingWorkCount() {
+      this.pendingWorkCount--;
+    }
+
+    /**
+     * Return true if all the pending work is done and directory fully
+     * scanned, otherwise false.
+     */
+    public synchronized boolean isDirWorkDone() {
+      return (pendingWorkCount <= 0 && fullyScanned);
+    }
+
+    /**
+     * Mark directory scan is completed.
+     */
+    public synchronized void markScanCompleted() {
+      this.fullyScanned = true;
+    }
   }
 
-  public void stop() {
-    if (fileInodeIdCollector != null) {
-      fileInodeIdCollector.interrupt();
+  public void init() {
+    inodeIdCollector = new Daemon(new StorageMovementPendingInodeIdCollector(
+        namesystem.getFSDirectory()));
+    inodeIdCollector.setName("FileInodeIdCollector");
+    inodeIdCollector.start();
+  }
+
+  public void close() {
+    if (inodeIdCollector != null) {
+      inodeIdCollector.interrupt();
+    }
+  }
+
+  class SPSTraverseInfo extends TraverseInfo {
+    private long startId;
+
+    SPSTraverseInfo(long startId) {
+      this.startId = startId;
+    }
+
+    public long getStartId() {
+      return startId;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50d23317/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
index b92fe9f..feacd74 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ReencryptionHandler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import 
org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50d23317/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 48d0598..a4372d5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -77,7 +77,8 @@ public class StoragePolicySatisfier implements Runnable {
   private final BlockStorageMovementNeeded storageMovementNeeded;
   private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
   private volatile boolean isRunning = false;
-
+  private int spsWorkMultiplier;
+  private long blockCount = 0L;
   /**
    * Represents the collective analysis status for all blocks.
    */
@@ -106,7 +107,9 @@ public class StoragePolicySatisfier implements Runnable {
       final BlockManager blkManager, Configuration conf) {
     this.namesystem = namesystem;
     this.storageMovementNeeded = new BlockStorageMovementNeeded(namesystem,
-        this);
+        this, conf.getInt(
+            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+            DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_DEFAULT));
     this.blockManager = blkManager;
     this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
         conf.getLong(
@@ -117,6 +120,7 @@ public class StoragePolicySatisfier implements Runnable {
             
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
         storageMovementNeeded,
         this);
+    this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(conf);
   }
 
   /**
@@ -143,7 +147,7 @@ public class StoragePolicySatisfier implements Runnable {
     // Ensure that all the previously submitted block movements(if any) have to
     // be stopped in all datanodes.
     addDropSPSWorkCommandsToAllDNs();
-    storageMovementNeeded.start();
+    storageMovementNeeded.init();
     storagePolicySatisfierThread = new Daemon(this);
     storagePolicySatisfierThread.setName("StoragePolicySatisfier");
     storagePolicySatisfierThread.start();
@@ -164,7 +168,7 @@ public class StoragePolicySatisfier implements Runnable {
       return;
     }
 
-    storageMovementNeeded.stop();
+    storageMovementNeeded.close();
 
     storagePolicySatisfierThread.interrupt();
     this.storageMovementsMonitor.stop();
@@ -268,9 +272,13 @@ public class StoragePolicySatisfier implements Runnable {
             }
           }
         }
-        // TODO: We can think to make this as configurable later, how 
frequently
-        // we want to check block movements.
-        Thread.sleep(3000);
+        int numLiveDn = namesystem.getFSDirectory().getBlockManager()
+            .getDatanodeManager().getNumLiveDataNodes();
+        if (storageMovementNeeded.size() == 0
+            || blockCount > (numLiveDn * spsWorkMultiplier)) {
+          Thread.sleep(3000);
+          blockCount = 0L;
+        }
       } catch (Throwable t) {
         handleException(t);
       }
@@ -380,6 +388,11 @@ public class StoragePolicySatisfier implements Runnable {
 
     assignBlockMovingInfosToCoordinatorDn(blockCollection.getId(),
         blockMovingInfos, coordinatorNode);
+    int count = 0;
+    for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
+      count = count + blkMovingInfo.getSources().length;
+    }
+    blockCount = blockCount + count;
     return status;
   }
 
@@ -840,7 +853,7 @@ public class StoragePolicySatisfier implements Runnable {
    *          - file inode/blockcollection id.
    */
   public void satisfyStoragePolicy(Long inodeId) {
-    //For file rootId and trackId is same
+    //For file startId and trackId is same
     storageMovementNeeded.add(new ItemInfo(inodeId, inodeId));
     if (LOG.isDebugEnabled()) {
       LOG.debug("Added track info for inode {} to block "
@@ -864,19 +877,19 @@ public class StoragePolicySatisfier implements Runnable {
    * policy.
    */
   public static class ItemInfo {
-    private long rootId;
+    private long startId;
     private long trackId;
 
-    public ItemInfo(long rootId, long trackId) {
-      this.rootId = rootId;
+    public ItemInfo(long startId, long trackId) {
+      this.startId = startId;
       this.trackId = trackId;
     }
 
     /**
-     * Return the root of the current track Id.
+     * Return the start inode id of the current track Id.
      */
-    public long getRootId() {
-      return rootId;
+    public long getStartId() {
+      return startId;
     }
 
     /**
@@ -890,7 +903,7 @@ public class StoragePolicySatisfier implements Runnable {
      * Returns true if the tracking path is a directory, false otherwise.
      */
     public boolean isDir() {
-      return (rootId != trackId);
+      return (startId != trackId);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50d23317/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index eaf6bda..e0ca175 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4499,6 +4499,29 @@
 </property>
 
 <property>
+  <name>dfs.storage.policy.satisfier.queue.limit</name>
+  <value>1000</value>
+  <description>
+    Storage policy satisfier queue size. This queue contains the currently
+    scheduled file's inode ID for statisfy the policy.
+    Default value is 1000.
+  </description>
+</property>
+
+<property>
+  <name>dfs.storage.policy.satisfier.work.multiplier.per.iteration</name>
+  <value>1</value>
+  <description>
+    *Note*: Advanced property. Change with caution.
+    This determines the total amount of block transfers to begin in
+    one iteration, for satisfy the policy. The actual number is obtained by
+    multiplying this multiplier with the total number of live nodes in the
+    cluster. The result number is the number of blocks to begin transfers
+    immediately. This number can be any positive, non-zero integer.
+  </description>
+</property>
+
+<property>
   <name>dfs.storage.policy.satisfier.recheck.timeout.millis</name>
   <value>300000</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50d23317/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md 
b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
index f6bbd10..c8a9466 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
@@ -112,7 +112,7 @@ SPS can be enabled and disabled dynamically without 
restarting the Namenode.
 
 Detailed design documentation can be found at [Storage Policy Satisfier(SPS) 
(HDFS-10285)](https://issues.apache.org/jira/browse/HDFS-10285)
 
-* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS 
will consider the files which are immediate to that directory. Sub-directories 
won't be considered for satisfying the policy. Its user responsibility to call 
this API on directories recursively, to track all files under the sub tree.
+* **Note**: When user invokes `satisfyStoragePolicy()` API on a directory, SPS 
will scan all sub-directories and consider all the files for satisfy the 
policy..
 
 * HdfsAdmin API :
         `public void satisfyStoragePolicy(final Path path) throws IOException`
@@ -214,7 +214,6 @@ Get the storage policy of a file or a directory.
 ### Satisfy Storage Policy
 
 Schedule blocks to move based on file's/directory's current storage policy.
-Note: For directory case, it will consider immediate files under that 
directory and it will not consider sub directories recursively.
 
 * Command:
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50d23317/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
index 55ebf9c..7918821 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -41,7 +41,7 @@ public class TestBlockStorageMovementAttemptedItems {
   public void setup() throws Exception {
     unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
         Mockito.mock(Namesystem.class),
-        Mockito.mock(StoragePolicySatisfier.class));
+        Mockito.mock(StoragePolicySatisfier.class), 100);
     StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
     bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
         selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50d23317/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
index e7b9148..5bce296 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
@@ -191,7 +191,7 @@ public class TestPersistentStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
       DFSTestUtil.waitExpectedStorageType(
-          childFileName, StorageType.DEFAULT, 3, timeout, fs);
+          childFileName, StorageType.ARCHIVE, 3, timeout, fs);
 
     } finally {
       clusterShutdown();
@@ -232,7 +232,9 @@ public class TestPersistentStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           parentFileName, StorageType.ARCHIVE, 2, timeout, fs);
       DFSTestUtil.waitExpectedStorageType(
-          childFileName, StorageType.DEFAULT, 3, timeout, fs);
+          childFileName, StorageType.DISK, 1, timeout, fs);
+      DFSTestUtil.waitExpectedStorageType(
+          childFileName, StorageType.ARCHIVE, 2, timeout, fs);
     } finally {
       clusterShutdown();
     }
@@ -269,7 +271,7 @@ public class TestPersistentStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
       DFSTestUtil.waitExpectedStorageType(
-          childFileName, StorageType.DEFAULT, 3, timeout, fs);
+          childFileName, StorageType.ARCHIVE, 3, timeout, fs);
     } finally {
       clusterShutdown();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50d23317/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 3375590..57e9f94 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -21,6 +21,9 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE
 import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -61,8 +64,10 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
 
 import com.google.common.base.Supplier;
 
@@ -71,6 +76,12 @@ import com.google.common.base.Supplier;
  * moved and finding its suggested target locations to move.
  */
 public class TestStoragePolicySatisfier {
+
+  {
+    GenericTestUtils.setLogLevel(
+        getLogger(FSTreeTraverser.class), Level.DEBUG);
+  }
+
   private static final String ONE_SSD = "ONE_SSD";
   private static final String COLD = "COLD";
   private static final Logger LOG =
@@ -341,7 +352,9 @@ public class TestStoragePolicySatisfier {
 
       // take no effect for the sub-dir's file in the directory.
       DFSTestUtil.waitExpectedStorageType(
-          subFile2, StorageType.DEFAULT, 3, 30000, dfs);
+          subFile2, StorageType.SSD, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(
+          subFile2, StorageType.DISK, 2, 30000, dfs);
     } finally {
       shutdownCluster();
     }
@@ -1083,6 +1096,368 @@ public class TestStoragePolicySatisfier {
     }
   }
 
+  /**
+   * Test SPS for empty directory, xAttr should be removed.
+   */
+  @Test(timeout = 300000)
+  public void testSPSForEmptyDirectory() throws IOException, TimeoutException,
+      InterruptedException {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path emptyDir = new Path("/emptyDir");
+      fs.mkdirs(emptyDir);
+      fs.satisfyStoragePolicy(emptyDir);
+      // Make sure satisfy xattr has been removed.
+      DFSTestUtil.waitForXattrRemoved("/emptyDir",
+          XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test SPS for not exist directory.
+   */
+  @Test(timeout = 300000)
+  public void testSPSForNonExistDirectory() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path emptyDir = new Path("/emptyDir");
+      try {
+        fs.satisfyStoragePolicy(emptyDir);
+        fail("FileNotFoundException should throw");
+      } catch (FileNotFoundException e) {
+        // nothing to do
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test SPS for directory tree which doesn't have files.
+   */
+  @Test(timeout = 300000)
+  public void testSPSWithDirectoryTreeWithoutFile() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      cluster = new MiniDFSCluster.Builder(conf).build();
+      cluster.waitActive();
+      // Create directories
+      /*
+       *                   root
+       *                    |
+       *           A--------C--------D
+       *                    |
+       *               G----H----I
+       *                    |
+       *                    O
+       */
+      DistributedFileSystem fs = cluster.getFileSystem();
+      fs.mkdirs(new Path("/root/C/H/O"));
+      fs.mkdirs(new Path("/root/A"));
+      fs.mkdirs(new Path("/root/D"));
+      fs.mkdirs(new Path("/root/C/G"));
+      fs.mkdirs(new Path("/root/C/I"));
+      fs.satisfyStoragePolicy(new Path("/root"));
+      // Make sure satisfy xattr has been removed.
+      DFSTestUtil.waitForXattrRemoved("/root",
+          XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test SPS for directory which has multilevel directories.
+   */
+  @Test(timeout = 300000)
+  public void testMultipleLevelDirectoryForSatisfyStoragePolicy()
+      throws Exception {
+    try {
+      StorageType[][] diskTypes = new StorageType[][] {
+          {StorageType.DISK, StorageType.ARCHIVE},
+          {StorageType.ARCHIVE, StorageType.SSD},
+          {StorageType.DISK, StorageType.DISK}};
+      config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+          storagesPerDatanode, capacity);
+      dfs = hdfsCluster.getFileSystem();
+      createDirectoryTree(dfs);
+
+      List<String> files = getDFSListOfTree();
+      dfs.setStoragePolicy(new Path("/root"), COLD);
+      dfs.satisfyStoragePolicy(new Path("/root"));
+      for (String fileName : files) {
+        // Wait till the block is moved to ARCHIVE
+        DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
+            30000, dfs);
+      }
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+  /**
+   * Test SPS for batch processing.
+   */
+  @Test(timeout = 300000)
+  public void testBatchProcessingForSPSDirectory() throws Exception {
+    try {
+      StorageType[][] diskTypes = new StorageType[][] {
+          {StorageType.DISK, StorageType.ARCHIVE},
+          {StorageType.ARCHIVE, StorageType.SSD},
+          {StorageType.DISK, StorageType.DISK}};
+      config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+      config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
+          true);
+      // Set queue max capacity
+      config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY,
+          5);
+      hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+          storagesPerDatanode, capacity);
+      dfs = hdfsCluster.getFileSystem();
+      createDirectoryTree(dfs);
+      List<String> files = getDFSListOfTree();
+      LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(LogFactory
+          .getLog(FSTreeTraverser.class));
+
+      dfs.setStoragePolicy(new Path("/root"), COLD);
+      dfs.satisfyStoragePolicy(new Path("/root"));
+      for (String fileName : files) {
+        // Wait till the block is moved to ARCHIVE
+        DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2,
+            30000, dfs);
+      }
+      waitForBlocksMovementResult(files.size(), 30000);
+      String expectedLogMessage = "StorageMovementNeeded queue remaining"
+          + " capacity is zero";
+      assertTrue("Log output does not contain expected log message: "
+          + expectedLogMessage, logs.getOutput().contains(expectedLogMessage));
+    } finally {
+      shutdownCluster();
+    }
+  }
+
+
+  /**
+   *  Test traverse when parent got deleted.
+   *  1. Delete /root when traversing Q
+   *  2. U, R, S should not be in queued.
+   */
+  @Test
+  public void testTraverseWhenParentDeleted() throws Exception {
+    StorageType[][] diskTypes = new StorageType[][] {
+        {StorageType.DISK, StorageType.ARCHIVE},
+        {StorageType.ARCHIVE, StorageType.SSD},
+        {StorageType.DISK, StorageType.DISK}};
+    config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+        storagesPerDatanode, capacity);
+    dfs = hdfsCluster.getFileSystem();
+    createDirectoryTree(dfs);
+
+    List<String> expectedTraverseOrder = getDFSListOfTree();
+
+    //Remove files which will not be traverse when parent is deleted
+    expectedTraverseOrder.remove("/root/D/L/R");
+    expectedTraverseOrder.remove("/root/D/L/S");
+    expectedTraverseOrder.remove("/root/D/L/Q/U");
+    FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory();
+
+    //Queue limit can control the traverse logic to wait for some free
+    //entry in queue. After 10 files, traverse control will be on U.
+    StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
+    Mockito.when(sps.isRunning()).thenReturn(true);
+    BlockStorageMovementNeeded movmentNeededQueue =
+        new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10);
+    INode rootINode = fsDir.getINode("/root");
+    movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
+    movmentNeededQueue.init();
+
+    //Wait for thread to reach U.
+    Thread.sleep(1000);
+
+    dfs.delete(new Path("/root/D/L"), true);
+
+    // Remove 10 element and make queue free, So other traversing will start.
+    for (int i = 0; i < 10; i++) {
+      String path = expectedTraverseOrder.remove(0);
+      long trackId = movmentNeededQueue.get().getTrackId();
+      INode inode = fsDir.getInode(trackId);
+      assertTrue("Failed to traverse tree, expected " + path + " but got "
+          + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+    }
+    //Wait to finish tree traverse
+    Thread.sleep(5000);
+
+    // Check other element traversed in order and R,S should not be added in
+    // queue which we already removed from expected list
+    for (String path : expectedTraverseOrder) {
+      long trackId = movmentNeededQueue.get().getTrackId();
+      INode inode = fsDir.getInode(trackId);
+      assertTrue("Failed to traverse tree, expected " + path + " but got "
+          + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+    }
+    dfs.delete(new Path("/root"), true);
+  }
+
+  /**
+   *  Test traverse when root parent got deleted.
+   *  1. Delete L when traversing Q
+   *  2. E, M, U, R, S should not be in queued.
+   */
+  @Test
+  public void testTraverseWhenRootParentDeleted() throws Exception {
+    StorageType[][] diskTypes = new StorageType[][] {
+        {StorageType.DISK, StorageType.ARCHIVE},
+        {StorageType.ARCHIVE, StorageType.SSD},
+        {StorageType.DISK, StorageType.DISK}};
+    config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    hdfsCluster = startCluster(config, diskTypes, diskTypes.length,
+        storagesPerDatanode, capacity);
+    dfs = hdfsCluster.getFileSystem();
+    createDirectoryTree(dfs);
+
+    List<String> expectedTraverseOrder = getDFSListOfTree();
+
+    // Remove files which will not be traverse when parent is deleted
+    expectedTraverseOrder.remove("/root/D/L/R");
+    expectedTraverseOrder.remove("/root/D/L/S");
+    expectedTraverseOrder.remove("/root/D/L/Q/U");
+    expectedTraverseOrder.remove("/root/D/M");
+    expectedTraverseOrder.remove("/root/E");
+    FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory();
+    StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
+    Mockito.when(sps.isRunning()).thenReturn(true);
+    // Queue limit can control the traverse logic to wait for some free
+    // entry in queue. After 10 files, traverse control will be on U.
+    BlockStorageMovementNeeded movmentNeededQueue =
+        new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10);
+    movmentNeededQueue.init();
+    INode rootINode = fsDir.getINode("/root");
+    movmentNeededQueue.addToPendingDirQueue(rootINode.getId());
+    // Wait for thread to reach U.
+    Thread.sleep(1000);
+
+    dfs.delete(new Path("/root/D/L"), true);
+
+    // Remove 10 element and make queue free, So other traversing will start.
+    for (int i = 0; i < 10; i++) {
+      String path = expectedTraverseOrder.remove(0);
+      long trackId = movmentNeededQueue.get().getTrackId();
+      INode inode = fsDir.getInode(trackId);
+      assertTrue("Failed to traverse tree, expected " + path + " but got "
+          + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+    }
+    // Wait to finish tree traverse
+    Thread.sleep(5000);
+
+    // Check other element traversed in order and E, M, U, R, S should not be
+    // added in queue which we already removed from expected list
+    for (String path : expectedTraverseOrder) {
+      long trackId = movmentNeededQueue.get().getTrackId();
+      INode inode = fsDir.getInode(trackId);
+      assertTrue("Failed to traverse tree, expected " + path + " but got "
+          + inode.getFullPathName(), path.equals(inode.getFullPathName()));
+    }
+    dfs.delete(new Path("/root"), true);
+  }
+
+  private static void createDirectoryTree(DistributedFileSystem dfs)
+      throws Exception {
+    // tree structure
+    /*
+     *                           root
+     *                             |
+     *           A--------B--------C--------D--------E
+     *                    |                 |
+     *          F----G----H----I       J----K----L----M
+     *               |                           |
+     *          N----O----P                 Q----R----S
+     *                    |                 |
+     *                    T                 U
+     */
+    // create root Node and child
+    dfs.mkdirs(new Path("/root"));
+    DFSTestUtil.createFile(dfs, new Path("/root/A"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/B"));
+    DFSTestUtil.createFile(dfs, new Path("/root/C"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/D"));
+    DFSTestUtil.createFile(dfs, new Path("/root/E"), 1024, (short) 3, 0);
+
+    // Create /root/B child
+    DFSTestUtil.createFile(dfs, new Path("/root/B/F"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/B/G"));
+    DFSTestUtil.createFile(dfs, new Path("/root/B/H"), 1024, (short) 3, 0);
+    DFSTestUtil.createFile(dfs, new Path("/root/B/I"), 1024, (short) 3, 0);
+
+    // Create /root/D child
+    DFSTestUtil.createFile(dfs, new Path("/root/D/J"), 1024, (short) 3, 0);
+    DFSTestUtil.createFile(dfs, new Path("/root/D/K"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/D/L"));
+    DFSTestUtil.createFile(dfs, new Path("/root/D/M"), 1024, (short) 3, 0);
+
+    // Create /root/B/G child
+    DFSTestUtil.createFile(dfs, new Path("/root/B/G/N"), 1024, (short) 3, 0);
+    DFSTestUtil.createFile(dfs, new Path("/root/B/G/O"), 1024, (short) 3, 0);
+    dfs.mkdirs(new Path("/root/B/G/P"));
+
+    // Create /root/D/L child
+    dfs.mkdirs(new Path("/root/D/L/Q"));
+    DFSTestUtil.createFile(dfs, new Path("/root/D/L/R"), 1024, (short) 3, 0);
+    DFSTestUtil.createFile(dfs, new Path("/root/D/L/S"), 1024, (short) 3, 0);
+
+    // Create /root/B/G/P child
+    DFSTestUtil.createFile(dfs, new Path("/root/B/G/P/T"), 1024, (short) 3, 0);
+
+    // Create /root/D/L/Q child
+    DFSTestUtil.createFile(dfs, new Path("/root/D/L/Q/U"), 1024, (short) 3, 0);
+  }
+
+  private List<String> getDFSListOfTree() {
+    List<String> dfsList = new ArrayList<>();
+    dfsList.add("/root/A");
+    dfsList.add("/root/B/F");
+    dfsList.add("/root/B/G/N");
+    dfsList.add("/root/B/G/O");
+    dfsList.add("/root/B/G/P/T");
+    dfsList.add("/root/B/H");
+    dfsList.add("/root/B/I");
+    dfsList.add("/root/C");
+    dfsList.add("/root/D/J");
+    dfsList.add("/root/D/K");
+    dfsList.add("/root/D/L/Q/U");
+    dfsList.add("/root/D/L/R");
+    dfsList.add("/root/D/L/S");
+    dfsList.add("/root/D/M");
+    dfsList.add("/root/E");
+    return dfsList;
+  }
+
   private String createFileAndSimulateFavoredNodes(int favoredNodesCount)
       throws IOException {
     ArrayList<DataNode> dns = hdfsCluster.getDataNodes();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to